From fbb0bd9b06c60ff5f8550c38a3f879a30dc600b5 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Fri, 23 May 2025 16:44:04 -0400 Subject: [PATCH 01/21] WIP: context client_id messaging --- lib/mcp/server.rb | 116 +++++++++++++-------------- lib/mcp/transports/rack_transport.rb | 29 ++++--- 2 files changed, 76 insertions(+), 69 deletions(-) diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index a51a7a1..20f8287 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -133,18 +133,19 @@ def start_authenticated_rack(app, options = {}) end # Handle incoming JSON-RPC request - def handle_request(json_str) # rubocop:disable Metrics/MethodLength + def handle_request(json_str, context = {}) # rubocop:disable Metrics/MethodLength + client_id = context[:client_id] begin request = JSON.parse(json_str) rescue JSON::ParserError, TypeError - return send_error(-32_600, 'Invalid Request', nil) + return send_error(-32_600, 'Invalid Request', nil, client_id) end @logger.debug("Received request: #{request.inspect}") # Check if it's a valid JSON-RPC 2.0 request unless request['jsonrpc'] == '2.0' && request['method'] - return send_error(-32_600, 'Invalid Request', request['id']) + return send_error(-32_600, 'Invalid Request', request['id'], client_id) end method = request['method'] @@ -153,38 +154,38 @@ def handle_request(json_str) # rubocop:disable Metrics/MethodLength case method when 'ping' - send_result({}, id) + send_result({}, id, client_id) when 'initialize' - handle_initialize(params, id) + handle_initialize(params, id, context) when 'notifications/initialized' handle_initialized_notification when 'tools/list' - handle_tools_list(id) + handle_tools_list(id, context) when 'tools/call' - handle_tools_call(params, id) + handle_tools_call(params, id, context) when 'resources/list' - handle_resources_list(id) + handle_resources_list(id, context) when 'resources/read' - handle_resources_read(params, id) + handle_resources_read(params, id, context) when 'resources/subscribe' - handle_resources_subscribe(params, id) + handle_resources_subscribe(params, id, context) when 'resources/unsubscribe' - handle_resources_unsubscribe(params, id) + handle_resources_unsubscribe(params, id, context) else - send_error(-32_601, "Method not found: #{method}", id) + send_error(-32_601, "Method not found: #{method}", id, client_id) end rescue StandardError => e @logger.error("Error handling request: #{e.message}, #{e.backtrace.join("\n")}") - send_error(-32_600, "Internal error: #{e.message}", id) + send_error(-32_600, "Internal error: #{e.message}", id, client_id) end # Handle a JSON-RPC request and return the response as a JSON string - def handle_json_request(request) + def handle_json_request(request, context = {}) # Process the request if request.is_a?(String) - handle_request(request) + handle_request(request, context) else - handle_request(JSON.generate(request)) + handle_request(JSON.generate(request), context) end end @@ -219,10 +220,11 @@ def notify_resource_updated(uri) PROTOCOL_VERSION = '2024-11-05' - def handle_initialize(params, id) + def handle_initialize(params, id, context) # Store client capabilities for later use @client_capabilities = params['capabilities'] || {} client_info = params['clientInfo'] || {} + client_id = context[:client_id] # Log client information @logger.info("Client connected: #{client_info['name']} v#{client_info['version']}") @@ -240,17 +242,18 @@ def handle_initialize(params, id) @logger.info("Server response: #{response.inspect}") - send_result(response, id) + send_result(response, id, client_id) end # Handle a resource read - def handle_resources_read(params, id) + def handle_resources_read(params, id, context) uri = params['uri'] + client_id = context[:client_id] - return send_error(-32_602, 'Invalid params: missing resource URI', id) unless uri + return send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) unless uri resource = @resources[uri] - return send_error(-32_602, "Resource not found: #{uri}", id) unless resource + return send_error(-32_602, "Resource not found: #{uri}", id, client_id) unless resource base_content = { uri: resource.uri } base_content[:mimeType] = resource.mime_type if resource.mime_type @@ -266,7 +269,7 @@ def handle_resources_read(params, id) } end - send_result(result, id) + send_result(result, id, client_id) end def handle_initialized_notification @@ -279,7 +282,8 @@ def handle_initialized_notification end # Handle tools/list request - def handle_tools_list(id) + def handle_tools_list(id, context) + client_id = context[:client_id] tools_list = @tools.values.map do |tool| { name: tool.tool_name, @@ -288,18 +292,19 @@ def handle_tools_list(id) } end - send_result({ tools: tools_list }, id) + send_result({ tools: tools_list }, id, client_id) end # Handle tools/call request - def handle_tools_call(params, id) + def handle_tools_call(params, id, context = {}) tool_name = params['name'] arguments = params['arguments'] || {} + client_id = context[:client_id] - return send_error(-32_602, 'Invalid params: missing tool name', id) unless tool_name + return send_error(-32_602, 'Invalid params: missing tool name', id, client_id) unless tool_name tool = @tools[tool_name] - return send_error(-32_602, "Tool not found: #{tool_name}", id) unless tool + return send_error(-32_602, "Tool not found: #{tool_name}", id, client_id) unless tool begin # Convert string keys to symbols for Ruby @@ -307,64 +312,59 @@ def handle_tools_call(params, id) result, metadata = tool.new.call_with_schema_validation!(**symbolized_args) # Format and send the result - send_formatted_result(result, id, metadata) + send_formatted_result(result, id, metadata, client_id) rescue FastMcp::Tool::InvalidArgumentsError => e @logger.error("Invalid arguments for tool #{tool_name}: #{e.message}") - send_error_result(e.message, id) + send_error_result(e.message, id, client_id) rescue StandardError => e @logger.error("Error calling tool #{tool_name}: #{e.message}") - send_error_result("#{e.message}, #{e.backtrace.join("\n")}", id) + send_error_result("#{e.message}, #{e.backtrace.join("\n")}", id, client_id) end end # Format and send successful result - def send_formatted_result(result, id, metadata) + def send_formatted_result(result, id, metadata, client_id) # Check if the result is already in the expected format if result.is_a?(Hash) && result.key?(:content) - send_result(result, id, metadata: metadata) + send_result(result, id, client_id, metadata: metadata) else - # Format the result according to the MCP specification - formatted_result = { - content: [{ type: 'text', text: result.to_s }], - isError: false - } - - send_result(formatted_result, id, metadata: metadata) + send_result({ content: result }, id, client_id, metadata: metadata) end end # Format and send error result - def send_error_result(message, id) + def send_error_result(message, id, client_id) # Format error according to the MCP specification error_result = { content: [{ type: 'text', text: "Error: #{message}" }], isError: true } - send_result(error_result, id) + send_response(error_result, client_id) end # Handle resources/list request - def handle_resources_list(id) + def handle_resources_list(id, context) + client_id = context[:client_id] resources_list = @resources.values.map(&:metadata) - - send_result({ resources: resources_list }, id) + send_result({ resources: resources_list }, id, client_id) end # Handle resources/subscribe request - def handle_resources_subscribe(params, id) + def handle_resources_subscribe(params, id, context) return unless @client_initialized uri = params['uri'] + client_id = context[:client_id] unless uri - send_error(-32_602, 'Invalid params: missing resource URI', id) + send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) return end resource = @resources[uri] unless resource - send_error(-32_602, "Resource not found: #{uri}", id) + send_error(-32_602, "Resource not found: #{uri}", id, client_id) return end @@ -372,17 +372,18 @@ def handle_resources_subscribe(params, id) @resource_subscriptions[uri] ||= [] @resource_subscriptions[uri] << id - send_result({ subscribed: true }, id) + send_result({ subscribed: true }, id, client_id) end # Handle resources/unsubscribe request - def handle_resources_unsubscribe(params, id) + def handle_resources_unsubscribe(params, id, context) return unless @client_initialized uri = params['uri'] + client_id = context[:client_id] unless uri - send_error(-32_602, 'Invalid params: missing resource URI', id) + send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) return end @@ -392,7 +393,7 @@ def handle_resources_unsubscribe(params, id) @resource_subscriptions.delete(uri) if @resource_subscriptions[uri].empty? end - send_result({ unsubscribed: true }, id) + send_result({ unsubscribed: true }, id, client_id) end # Notify clients about resource list changes @@ -409,9 +410,8 @@ def notify_resource_list_changed end # Send a JSON-RPC result response - def send_result(result, id, metadata: {}) + def send_result(result, id, client_id, metadata: {}) result[:_meta] = metadata if metadata.is_a?(Hash) && !metadata.empty? - response = { jsonrpc: '2.0', id: id, @@ -419,11 +419,11 @@ def send_result(result, id, metadata: {}) } @logger.info("Sending result: #{response.inspect}") - send_response(response) + send_response(response, client_id) end # Send a JSON-RPC error response - def send_error(code, message, id = nil) + def send_error(code, message, id = nil, client_id) response = { jsonrpc: '2.0', error: { @@ -433,14 +433,14 @@ def send_error(code, message, id = nil) id: id } - send_response(response) + send_response(response, client_id) end # Send a JSON-RPC response - def send_response(response) + def send_response(response, client_id) if @transport @logger.debug("Sending response: #{response.inspect}") - @transport.send_message(response) + @transport.send_message_to(client_id, response) else @logger.warn("No transport available to send response: #{response.inspect}") @logger.warn("Transport: #{@transport.inspect}, transport_klass: #{@transport_klass.inspect}") diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index cf7d455..b5339e4 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -72,19 +72,10 @@ def send_message(message) clients_to_remove = [] - @sse_clients.each do |client_id, client| - stream = client[:stream] - next if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) - - stream.write("data: #{json_message}\n\n") - stream.flush if stream.respond_to?(:flush) - rescue Errno::EPIPE, IOError => e - # Broken pipe or IO error - client disconnected - @logger.info("Client #{client_id} disconnected: #{e.message}") - clients_to_remove << client_id + @sse_clients.each_key do |client_id| + send_message_to(client_id, message) rescue StandardError => e @logger.error("Error sending message to client #{client_id}: #{e.message}") - # Remove the client if we can't send to it clients_to_remove << client_id end @@ -92,6 +83,20 @@ def send_message(message) clients_to_remove.each { |client_id| unregister_sse_client(client_id) } end + # Send a message to a specific SSE client + def send_message_to(client_id, message) + client = @sse_clients[client_id] + return unless client + + stream = client[:stream] + return if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) + + @logger.info("Client: #{client_id}, SSE Message: #{message}") + stream.write("data: #{JSON.generate(message)}\n\n") + stream.flush if stream.respond_to?(:flush) + nil + end + # Register a new SSE client def register_sse_client(client_id, stream) @logger.info("Registering SSE client: #{client_id}") @@ -503,6 +508,8 @@ def process_json_request(request) # Parse the request body body = request.body.read + context = extract_context_from_env(env) + context[:client_id] = extract_client_id(env) response = process_message(body) || [] @logger.info("Response: #{response}") From 992d269211582d0f3012328a744ae0a0b51dddd8 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Fri, 23 May 2025 16:48:00 -0400 Subject: [PATCH 02/21] WIP, context needs to be defined (separate work on rack env -> context will conflict a little here) --- lib/mcp/transports/base_transport.rb | 4 ++-- lib/mcp/transports/rack_transport.rb | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/lib/mcp/transports/base_transport.rb b/lib/mcp/transports/base_transport.rb index add85f9..cf73fbc 100644 --- a/lib/mcp/transports/base_transport.rb +++ b/lib/mcp/transports/base_transport.rb @@ -32,8 +32,8 @@ def send_message(message) # Process an incoming message # This is a helper method that can be used by subclasses - def process_message(message) - server.handle_json_request(message) + def process_message(message, context) + server.handle_json_request(message, context) end end end diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index b5339e4..645dc37 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -508,9 +508,8 @@ def process_json_request(request) # Parse the request body body = request.body.read - context = extract_context_from_env(env) - context[:client_id] = extract_client_id(env) - response = process_message(body) || [] + context = { client_id: extract_client_id(request.env) } + response = process_message(body, context) || [] @logger.info("Response: #{response}") [200, { 'Content-Type' => 'application/json' }, response] From 2aec510cf4aeafdaa4642e2fe543497cbb71d969 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Fri, 23 May 2025 17:18:35 -0400 Subject: [PATCH 03/21] send client id through sse --- lib/mcp/transports/rack_transport.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index 645dc37..976a944 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -389,7 +389,10 @@ def setup_sse_connection(client_id, io, env) # Send endpoint information as the first message with query parameters endpoint = "#{@path_prefix}/#{@messages_route}" - endpoint += "?#{query_string}" if query_string + params = [] + params << query_string if query_string && !query_string.empty? + params << "client_id=#{client_id}" + endpoint += "?#{params.join('&')}" unless params.empty? @logger.debug("Sending endpoint information to client #{client_id}: #{endpoint}") io.write("event: endpoint\ndata: #{endpoint}\n\n") From 12ed63e4ccea8e2ca6841208556ee8d88250a0e6 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Sat, 24 May 2025 09:31:37 -0400 Subject: [PATCH 04/21] Connections work, tool call not responding --- lib/mcp/transports/rack_transport.rb | 29 +++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index 976a944..c0e3a8d 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -86,7 +86,10 @@ def send_message(message) # Send a message to a specific SSE client def send_message_to(client_id, message) client = @sse_clients[client_id] - return unless client + if client.nil? + @logger.info("Client #{client_id} not found, skipping message") + return + end stream = client[:stream] return if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) @@ -99,12 +102,30 @@ def send_message_to(client_id, message) # Register a new SSE client def register_sse_client(client_id, stream) + existing_client = @sse_clients[client_id] + + if existing_client + @logger.info("Client #{client_id} already registered") + + if existing_client[:stream] != stream + @logger.info("New stream detected for client #{client_id}") + unregister_sse_client(client_id) + end + end + @logger.info("Registering SSE client: #{client_id}") @sse_clients[client_id] = { stream: stream, connected_at: Time.now } end # Unregister an SSE client def unregister_sse_client(client_id) + existing_client = @sse_clients[client_id] + return unless existing_client + + if existing_client[:stream].respond_to?(:close) && !existing_client[:stream].closed? + existing_client[:stream].close + end + @logger.info("Unregistering SSE client: #{client_id}") @sse_clients.delete(client_id) end @@ -307,11 +328,9 @@ def extract_client_id(env) @logger.info("Client connection from: #{user_agent} (#{browser_type})") # Handle reconnection - if client_id && @sse_clients.key?(client_id) - handle_client_reconnection(client_id, browser_type) - else + unless client_id # Generate a new client ID if none was provided - client_id ||= SecureRandom.uuid + client_id = SecureRandom.uuid @logger.info("New client connection: #{client_id} (#{browser_type})") end From 4ad8530967a5c6dd28d7c2d0250641c2e6856bd3 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Sat, 24 May 2025 10:17:17 -0400 Subject: [PATCH 05/21] Error responses fixed --- lib/mcp/server.rb | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index 20f8287..9c3dc55 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -328,7 +328,13 @@ def send_formatted_result(result, id, metadata, client_id) if result.is_a?(Hash) && result.key?(:content) send_result(result, id, client_id, metadata: metadata) else - send_result({ content: result }, id, client_id, metadata: metadata) + # Format the result according to the MCP specification + formatted_result = { + content: [{ type: 'text', text: result.to_s }], + isError: false + } + + send_result(formatted_result, id, client_id, metadata: metadata) end end @@ -340,7 +346,7 @@ def send_error_result(message, id, client_id) isError: true } - send_response(error_result, client_id) + send_result(error_result, id, client_id) end # Handle resources/list request From 4297dde44aa5c4120bbdfe5c3dafd3334309a5d7 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Sat, 24 May 2025 23:05:24 -0400 Subject: [PATCH 06/21] Test coverage, send_message_to base_transport impl --- lib/mcp/transports/base_transport.rb | 4 + lib/mcp/transports/rack_transport.rb | 11 +- lib/mcp/transports/stdio_transport.rb | 5 + spec/mcp/server_spec.rb | 38 +++--- .../authenticated_rack_transport_spec.rb | 12 +- spec/mcp/transports/rack_transport_spec.rb | 121 ++++++++++++++++-- 6 files changed, 153 insertions(+), 38 deletions(-) diff --git a/lib/mcp/transports/base_transport.rb b/lib/mcp/transports/base_transport.rb index cf73fbc..fb1d9f2 100644 --- a/lib/mcp/transports/base_transport.rb +++ b/lib/mcp/transports/base_transport.rb @@ -30,6 +30,10 @@ def send_message(message) raise NotImplementedError, "#{self.class} must implement #send_message" end + def send_message_to(client_id, message) + raise NotImplementedError, "#{self.class} must implement #send_message_to" + end + # Process an incoming message # This is a helper method that can be used by subclasses def process_message(message, context) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index c0e3a8d..cd5314f 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -91,12 +91,15 @@ def send_message_to(client_id, message) return end + json_message = message.is_a?(String) ? message : JSON.generate(message) stream = client[:stream] - return if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) - @logger.info("Client: #{client_id}, SSE Message: #{message}") - stream.write("data: #{JSON.generate(message)}\n\n") - stream.flush if stream.respond_to?(:flush) + if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) + unregister_sse_client(client_id) + else + stream.write("data: #{json_message}\n\n") + stream.flush if stream.respond_to?(:flush) + end nil end diff --git a/lib/mcp/transports/stdio_transport.rb b/lib/mcp/transports/stdio_transport.rb index 2941169..b3246aa 100644 --- a/lib/mcp/transports/stdio_transport.rb +++ b/lib/mcp/transports/stdio_transport.rb @@ -43,6 +43,11 @@ def send_message(message) $stdout.flush end + # stdio transport does not support sending to specific clients + def send_message_to(client_id, message) + send_message(message) + end + private # Send a JSON-RPC error response diff --git a/spec/mcp/server_spec.rb b/spec/mcp/server_spec.rb index 2506fa5..2a0f937 100644 --- a/spec/mcp/server_spec.rb +++ b/spec/mcp/server_spec.rb @@ -34,6 +34,8 @@ def call(**_args) end describe '#handle_request' do + let(:client_id) { 'test-client-id' } + let(:context) { { client_id: client_id } } let(:test_tool_class) do Class.new(FastMcp::Tool) do def self.name @@ -90,8 +92,8 @@ def call(user:) it 'responds with an empty result' do request = { jsonrpc: '2.0', method: 'ping', id: 1 }.to_json - expect(server).to receive(:send_result).with({}, 1) - server.handle_request(request) + expect(server).to receive(:send_result).with({}, 1, client_id) + server.handle_request(request,context) end end @@ -115,8 +117,8 @@ def call(user:) name: server.name, version: server.version } - }, 1) - server.handle_request(request) + }, 1, client_id) + server.handle_request(request, context) end end @@ -144,7 +146,7 @@ def call(user:) expect(profile_tool[:inputSchema][:properties][:user][:properties]).to have_key(:last_name) end - server.handle_request(request) + server.handle_request(request, context) end end @@ -163,9 +165,10 @@ def call(user:) expect(server).to receive(:send_result).with( { content: [{ text: 'Hello, World!', type: 'text' }], isError: false }, 1, + client_id, metadata: {} ) - server.handle_request(request) + server.handle_request(request, context) end it 'calls a tool with nested properties' do @@ -187,9 +190,10 @@ def call(user:) expect(server).to receive(:send_result).with( { content: [{ text: 'John Doe', type: 'text' }], isError: false }, 1, + client_id, metadata: {} ) - server.handle_request(request) + server.handle_request(request, context) end it "returns an error if the tool doesn't exist" do @@ -203,8 +207,8 @@ def call(user:) id: 1 }.to_json - expect(server).to receive(:send_error).with(-32_602, 'Tool not found: non-existent-tool', 1) - server.handle_request(request) + expect(server).to receive(:send_error).with(-32_602, 'Tool not found: non-existent-tool', 1, client_id) + server.handle_request(request, context) end it 'returns an error if the tool name is missing' do @@ -217,8 +221,8 @@ def call(user:) id: 1 }.to_json - expect(server).to receive(:send_error).with(-32_602, 'Invalid params: missing tool name', 1) - server.handle_request(request) + expect(server).to receive(:send_error).with(-32_602, 'Invalid params: missing tool name', 1, client_id) + server.handle_request(request, context) end end @@ -226,22 +230,22 @@ def call(user:) it 'returns an error for an unknown method' do request = { jsonrpc: '2.0', method: 'unknown', id: 1 }.to_json - expect(server).to receive(:send_error).with(-32_601, 'Method not found: unknown', 1) - server.handle_request(request) + expect(server).to receive(:send_error).with(-32_601, 'Method not found: unknown', 1, client_id) + server.handle_request(request, context) end it 'returns an error for an invalid JSON-RPC request' do request = { id: 1 }.to_json - expect(server).to receive(:send_error).with(-32_600, 'Invalid Request', 1) - server.handle_request(request) + expect(server).to receive(:send_error).with(-32_600, 'Invalid Request', 1, client_id) + server.handle_request(request, context) end it 'returns an error for an invalid JSON request' do request = 'invalid json' - expect(server).to receive(:send_error).with(-32_600, 'Invalid Request', nil) - server.handle_request(request) + expect(server).to receive(:send_error).with(-32_600, 'Invalid Request', nil, client_id) + server.handle_request(request, context) end end end diff --git a/spec/mcp/transports/authenticated_rack_transport_spec.rb b/spec/mcp/transports/authenticated_rack_transport_spec.rb index 43fe9a8..6f33730 100644 --- a/spec/mcp/transports/authenticated_rack_transport_spec.rb +++ b/spec/mcp/transports/authenticated_rack_transport_spec.rb @@ -44,6 +44,8 @@ end describe '#call' do + let(:client_id) { 'test-client-id' } + let(:context) { { client_id: client_id } } context 'with valid authentication' do it 'passes the request to parent when token is valid for non-MCP paths' do env = { @@ -64,14 +66,15 @@ 'CONTENT_TYPE' => 'application/json', 'REMOTE_ADDR' => '127.0.0.1', 'rack.input' => StringIO.new(json_message), - 'HTTP_AUTHORIZATION' => "Bearer #{auth_token}" + 'HTTP_AUTHORIZATION' => "Bearer #{auth_token}", + 'QUERY_STRING' => "client_id=#{client_id}" } expect(server).to receive(:transport=).with(transport) # The RackTransport class will call server.handle_json_request with the message json_response = '{"jsonrpc":"2.0","result":{},"id":1}' - expect(server).to receive(:handle_json_request).with(json_message).and_return(json_response) + expect(server).to receive(:handle_json_request).with(json_message, context).and_return(json_response) # For MCP paths, we don't expect app.call to be invoked expect(app).not_to receive(:call) @@ -319,12 +322,13 @@ 'HTTP_ORIGIN' => 'http://localhost', 'REMOTE_ADDR' => '127.0.0.1', 'HTTP_AUTHORIZATION' => "Bearer #{auth_token}", - 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}') + 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), + 'QUERY_STRING' => "client_id=#{client_id}" } expect(server).to receive(:transport=).with(transport) expect(server).to receive(:handle_json_request) - .with('{"jsonrpc":"2.0","method":"ping","id":1}') + .with('{"jsonrpc":"2.0","method":"ping","id":1}', context) .and_return('{"jsonrpc":"2.0","result":{},"id":1}') result = transport.call(env) diff --git a/spec/mcp/transports/rack_transport_spec.rb b/spec/mcp/transports/rack_transport_spec.rb index 7ad2f06..37584f1 100644 --- a/spec/mcp/transports/rack_transport_spec.rb +++ b/spec/mcp/transports/rack_transport_spec.rb @@ -124,8 +124,10 @@ # Add a mock SSE client that raises an error client_stream = double('stream') expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) - expect(client_stream).to receive(:closed?).and_return(false) + expect(client_stream).to receive(:closed?).twice.and_return(false) # once for write check, once unregister expect(client_stream).to receive(:write).and_raise(StandardError.new('Test error')) + expect(client_stream).to receive(:respond_to?).with(:close).and_return(true) + expect(client_stream).to receive(:close) # unregister close transport.instance_variable_set(:@sse_clients, { 'test-client' => { stream: client_stream } }) @@ -141,7 +143,94 @@ end end + describe '#send_message_to' do + let(:client_id) { 'test-client' } + let(:client_stream) { double('stream') } + let(:client_id_2) { 'test-client-2' } + let(:client_stream_2) { double('stream-2') } + + before do + transport.instance_variable_set(:@sse_clients, { + client_id => { stream: client_stream }, + client_id_2 => { stream: client_stream_2 } + }) + end + + it 'sends a message to a specific client' do + expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) + expect(client_stream).to receive(:closed?).and_return(false) + expect(client_stream).to receive(:write).with("data: {\"test\":\"message\"}\n\n") + expect(client_stream).to receive(:respond_to?).with(:flush).and_return(true) + expect(client_stream).to receive(:flush) + + expect(client_stream_2).not_to receive(:write) + + transport.send_message_to(client_id, { test: 'message' }) + end + + it 'handles string messages' do + expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) + expect(client_stream).to receive(:closed?).and_return(false) + expect(client_stream).to receive(:write).with("data: test message\n\n") + expect(client_stream).to receive(:respond_to?).with(:flush).and_return(true) + expect(client_stream).to receive(:flush) + + transport.send_message_to(client_id, 'test message') + end + + it 'skips sending if client is not found' do + expect(logger).to receive(:info).with(/Client nonexistent-client not found, skipping message/) + transport.send_message_to('nonexistent-client', { test: 'message' }) + end + + it 'skips sending if stream is nil' do + transport.instance_variable_set(:@sse_clients, { + client_id => { stream: nil } + }) + transport.send_message_to(client_id, { test: 'message' }) + end + + it 'skips sending if stream is closed' do + expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) + expect(client_stream).to receive(:closed?).and_return(true) + expect(transport).to receive(:unregister_sse_client).with(client_id) + expect(client_stream).to receive(:write).never + + transport.send_message_to(client_id, { test: 'message' }) + end + + it 'unregisters client if stream is closed' do + expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) + expect(client_stream).to receive(:closed?).and_return(true) + expect(transport).to receive(:unregister_sse_client).with(client_id) + + transport.send_message_to(client_id, { test: 'message' }) + end + end + + describe '#unregister_sse_client' do + let(:client_id) { 'test-client' } + let(:client_stream) { double('stream') } + + it 'closes the stream' do + expect(client_stream).to receive(:respond_to?).with(:close).and_return(true) + expect(client_stream).to receive(:closed?).and_return(false) + expect(client_stream).to receive(:close) + transport.instance_variable_set(:@sse_clients, { 'test-client' => { stream: client_stream } }) + transport.unregister_sse_client('test-client') + end + + it 'removes a client from the sse_clients hash' do + transport.instance_variable_set(:@sse_clients, { 'test-client' => { stream: client_stream } }) + transport.unregister_sse_client('test-client') + expect(transport.sse_clients).to be_empty + end + end + describe '#call' do + let(:client_id) { 'test-client-id' } + let(:context) { { client_id: client_id } } + it 'passes non-MCP requests to the app' do env = { 'PATH_INFO' => '/not-mcp' } expect(app).to receive(:call).with(env).and_return([200, {}, ['OK']]) @@ -170,7 +259,8 @@ 'PATH_INFO' => '/mcp/messages', 'REQUEST_METHOD' => 'POST', 'HTTP_ORIGIN' => 'http://localhost', - 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}') + 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), + 'QUERY_STRING' => "client_id=#{client_id}" } # Create a proper request double that includes necessary methods @@ -178,15 +268,16 @@ ip: '127.0.0.1', path: '/mcp/messages', post?: true, - params: {}, + params: { 'client_id' => client_id }, body: instance_double(StringIO, read: '{"jsonrpc":"2.0","method":"ping","id":1}'), - host: 'localhost' + host: 'localhost', + env: env ) allow(Rack::Request).to receive(:new).with(env).and_return(request) expect(server).to receive(:transport=).with(transport) expect(server).to receive(:handle_json_request) - .with('{"jsonrpc":"2.0","method":"ping","id":1}') + .with('{"jsonrpc":"2.0","method":"ping","id":1}', context) .and_return('{"jsonrpc":"2.0","result":{},"id":1}') result = transport.call(env) @@ -274,13 +365,14 @@ 'REQUEST_METHOD' => 'POST', 'HTTP_ORIGIN' => 'https://sub.example.com', 'REMOTE_ADDR' => '127.0.0.1', - 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}') + 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), + 'QUERY_STRING' => "client_id=#{client_id}" } expect(server).to receive(:transport=).with(transport) # Mock the behavior for a valid Origin expect(server).to receive(:handle_json_request) - .with('{"jsonrpc":"2.0","method":"ping","id":1}') + .with('{"jsonrpc":"2.0","method":"ping","id":1}', context) .and_return('{"jsonrpc":"2.0","result":{},"id":1}') result = transport.call(env) @@ -317,12 +409,13 @@ 'REQUEST_METHOD' => 'POST', 'HTTP_REFERER' => 'http://localhost/some/path', 'REMOTE_ADDR' => '127.0.0.1', - 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}') + 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), + 'QUERY_STRING' => "client_id=#{client_id}" } expect(server).to receive(:transport=).with(transport) expect(server).to receive(:handle_json_request) - .with('{"jsonrpc":"2.0","method":"ping","id":1}') + .with('{"jsonrpc":"2.0","method":"ping","id":1}', context) .and_return('{"jsonrpc":"2.0","result":{},"id":1}') result = transport.call(env) @@ -335,12 +428,13 @@ 'REQUEST_METHOD' => 'POST', 'HTTP_HOST' => 'localhost:3000', 'REMOTE_ADDR' => '127.0.0.1', - 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}') + 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), + 'QUERY_STRING' => "client_id=#{client_id}" } expect(server).to receive(:transport=).with(transport) expect(server).to receive(:handle_json_request) - .with('{"jsonrpc":"2.0","method":"ping","id":1}') + .with('{"jsonrpc":"2.0","method":"ping","id":1}', context) .and_return('{"jsonrpc":"2.0","result":{},"id":1}') result = transport.call(env) @@ -447,13 +541,14 @@ 'REQUEST_METHOD' => 'POST', 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), 'CONTENT_TYPE' => 'application/json', - 'REMOTE_ADDR' => '127.0.0.1' + 'REMOTE_ADDR' => '127.0.0.1', + 'QUERY_STRING' => "client_id=#{client_id}" } expect(server).to receive(:transport=).with(transport) # Mock the server's handle_json_request method expect(server).to receive(:handle_json_request) - .with('{"jsonrpc":"2.0","method":"ping","id":1}') + .with('{"jsonrpc":"2.0","method":"ping","id":1}', context) .and_return('{"jsonrpc":"2.0","result":{},"id":1}') result = transport.call(env) From 61060a5c740b5dacb93559a5fed72da45ea7a8c9 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Sat, 24 May 2025 23:25:00 -0400 Subject: [PATCH 07/21] wip shift context -> headers --- lib/mcp/server.rb | 39 ++++++++++--------- spec/mcp/server_spec.rb | 22 +++++------ .../authenticated_rack_transport_spec.rb | 8 +--- 3 files changed, 33 insertions(+), 36 deletions(-) diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index aa11fe6..d111d94 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -134,6 +134,7 @@ def start_authenticated_rack(app, options = {}) # Handle incoming JSON-RPC request def handle_request(json_str, headers: {}) # rubocop:disable Metrics/MethodLength + client_id = headers[:client_id] begin request = JSON.parse(json_str) rescue JSON::ParserError, TypeError @@ -155,21 +156,21 @@ def handle_request(json_str, headers: {}) # rubocop:disable Metrics/MethodLength when 'ping' send_result({}, id, client_id) when 'initialize' - handle_initialize(params, id, context) + handle_initialize(params, headers, id) when 'notifications/initialized' handle_initialized_notification when 'tools/list' - handle_tools_list(id, context) + handle_tools_list(headers, id) when 'tools/call' handle_tools_call(params, headers, id) when 'resources/list' - handle_resources_list(id, context) + handle_resources_list(headers, id) when 'resources/read' - handle_resources_read(params, id, context) + handle_resources_read(params, headers, id) when 'resources/subscribe' - handle_resources_subscribe(params, id, context) + handle_resources_subscribe(params, headers, id) when 'resources/unsubscribe' - handle_resources_unsubscribe(params, id, context) + handle_resources_unsubscribe(params, headers, id) else send_error(-32_601, "Method not found: #{method}", id, client_id) end @@ -219,11 +220,11 @@ def notify_resource_updated(uri) PROTOCOL_VERSION = '2024-11-05' - def handle_initialize(params, id, context) + def handle_initialize(params, headers, id) # Store client capabilities for later use @client_capabilities = params['capabilities'] || {} client_info = params['clientInfo'] || {} - client_id = context[:client_id] + client_id = headers[:client_id] # Log client information @logger.info("Client connected: #{client_info['name']} v#{client_info['version']}") @@ -245,9 +246,9 @@ def handle_initialize(params, id, context) end # Handle a resource read - def handle_resources_read(params, id, context) + def handle_resources_read(params, headers, id) uri = params['uri'] - client_id = context[:client_id] + client_id = headers[:client_id] return send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) unless uri @@ -281,8 +282,8 @@ def handle_initialized_notification end # Handle tools/list request - def handle_tools_list(id, context) - client_id = context[:client_id] + def handle_tools_list(headers, id) + client_id = headers[:client_id] tools_list = @tools.values.map do |tool| { name: tool.tool_name, @@ -298,7 +299,7 @@ def handle_tools_list(id, context) def handle_tools_call(params, headers, id) tool_name = params['name'] arguments = params['arguments'] || {} - client_id = context[:client_id] + client_id = headers[:client_id] return send_error(-32_602, 'Invalid params: missing tool name', id, client_id) unless tool_name @@ -349,18 +350,18 @@ def send_error_result(message, id, client_id) end # Handle resources/list request - def handle_resources_list(id, context) - client_id = context[:client_id] + def handle_resources_list(headers, id) + client_id = headers[:client_id] resources_list = @resources.values.map(&:metadata) send_result({ resources: resources_list }, id, client_id) end # Handle resources/subscribe request - def handle_resources_subscribe(params, id, context) + def handle_resources_subscribe(params, headers, id) return unless @client_initialized uri = params['uri'] - client_id = context[:client_id] + client_id = headers[:client_id] unless uri send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) @@ -381,11 +382,11 @@ def handle_resources_subscribe(params, id, context) end # Handle resources/unsubscribe request - def handle_resources_unsubscribe(params, id, context) + def handle_resources_unsubscribe(params, headers, id) return unless @client_initialized uri = params['uri'] - client_id = context[:client_id] + client_id = headers[:client_id] unless uri send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) diff --git a/spec/mcp/server_spec.rb b/spec/mcp/server_spec.rb index 2a0f937..299c659 100644 --- a/spec/mcp/server_spec.rb +++ b/spec/mcp/server_spec.rb @@ -35,7 +35,7 @@ def call(**_args) describe '#handle_request' do let(:client_id) { 'test-client-id' } - let(:context) { { client_id: client_id } } + let(:headers) { { client_id: client_id } } let(:test_tool_class) do Class.new(FastMcp::Tool) do def self.name @@ -93,7 +93,7 @@ def call(user:) request = { jsonrpc: '2.0', method: 'ping', id: 1 }.to_json expect(server).to receive(:send_result).with({}, 1, client_id) - server.handle_request(request,context) + server.handle_request(request,headers: headers) end end @@ -118,7 +118,7 @@ def call(user:) version: server.version } }, 1, client_id) - server.handle_request(request, context) + server.handle_request(request, headers: headers) end end @@ -146,7 +146,7 @@ def call(user:) expect(profile_tool[:inputSchema][:properties][:user][:properties]).to have_key(:last_name) end - server.handle_request(request, context) + server.handle_request(request, headers: headers) end end @@ -168,7 +168,7 @@ def call(user:) client_id, metadata: {} ) - server.handle_request(request, context) + server.handle_request(request, headers: headers) end it 'calls a tool with nested properties' do @@ -193,7 +193,7 @@ def call(user:) client_id, metadata: {} ) - server.handle_request(request, context) + server.handle_request(request, headers: headers) end it "returns an error if the tool doesn't exist" do @@ -208,7 +208,7 @@ def call(user:) }.to_json expect(server).to receive(:send_error).with(-32_602, 'Tool not found: non-existent-tool', 1, client_id) - server.handle_request(request, context) + server.handle_request(request, headers: headers) end it 'returns an error if the tool name is missing' do @@ -222,7 +222,7 @@ def call(user:) }.to_json expect(server).to receive(:send_error).with(-32_602, 'Invalid params: missing tool name', 1, client_id) - server.handle_request(request, context) + server.handle_request(request, headers: headers) end end @@ -231,21 +231,21 @@ def call(user:) request = { jsonrpc: '2.0', method: 'unknown', id: 1 }.to_json expect(server).to receive(:send_error).with(-32_601, 'Method not found: unknown', 1, client_id) - server.handle_request(request, context) + server.handle_request(request, headers: headers) end it 'returns an error for an invalid JSON-RPC request' do request = { id: 1 }.to_json expect(server).to receive(:send_error).with(-32_600, 'Invalid Request', 1, client_id) - server.handle_request(request, context) + server.handle_request(request, headers: headers) end it 'returns an error for an invalid JSON request' do request = 'invalid json' expect(server).to receive(:send_error).with(-32_600, 'Invalid Request', nil, client_id) - server.handle_request(request, context) + server.handle_request(request, headers: headers) end end end diff --git a/spec/mcp/transports/authenticated_rack_transport_spec.rb b/spec/mcp/transports/authenticated_rack_transport_spec.rb index f84b4c6..dbe1b69 100644 --- a/spec/mcp/transports/authenticated_rack_transport_spec.rb +++ b/spec/mcp/transports/authenticated_rack_transport_spec.rb @@ -44,8 +44,6 @@ end describe '#call' do - let(:client_id) { 'test-client-id' } - let(:context) { { client_id: client_id } } context 'with valid authentication' do it 'passes the request to parent when token is valid for non-MCP paths' do env = { @@ -66,8 +64,7 @@ 'CONTENT_TYPE' => 'application/json', 'REMOTE_ADDR' => '127.0.0.1', 'rack.input' => StringIO.new(json_message), - 'HTTP_AUTHORIZATION' => "Bearer #{auth_token}", - 'QUERY_STRING' => "client_id=#{client_id}" + 'HTTP_AUTHORIZATION' => "Bearer #{auth_token}" } expect(server).to receive(:transport=).with(transport) @@ -322,8 +319,7 @@ 'HTTP_ORIGIN' => 'http://localhost', 'REMOTE_ADDR' => '127.0.0.1', 'HTTP_AUTHORIZATION' => "Bearer #{auth_token}", - 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), - 'QUERY_STRING' => "client_id=#{client_id}" + 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}') } expect(server).to receive(:transport=).with(transport) From c5bbb3dc61ad807b61891afd216c495907326090 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Sat, 24 May 2025 23:38:10 -0400 Subject: [PATCH 08/21] comment cleanup --- lib/mcp/transports/rack_transport.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index ef8ce9c..a3e7302 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -330,7 +330,8 @@ def extract_client_id(env) browser_type = detect_browser_type(user_agent) @logger.info("Client connection from: #{user_agent} (#{browser_type})") - # Handle reconnection + # WIP: reconnection wasn't necessary, removed temporarily but need to verify + unless client_id # Generate a new client ID if none was provided client_id = SecureRandom.uuid From a9b50d3121f9c009106946ce91b0277d13dedbee Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Mon, 26 May 2025 17:45:13 -0400 Subject: [PATCH 09/21] update comment --- lib/mcp/transports/rack_transport.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index a3e7302..9ee087e 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -330,8 +330,6 @@ def extract_client_id(env) browser_type = detect_browser_type(user_agent) @logger.info("Client connection from: #{user_agent} (#{browser_type})") - # WIP: reconnection wasn't necessary, removed temporarily but need to verify - unless client_id # Generate a new client ID if none was provided client_id = SecureRandom.uuid @@ -393,6 +391,7 @@ def handle_rack_hijack_sse(env) end # Set up the SSE connection + # If SSE connection already exists for a client through a different IO, it will be closed and a new one will be established def setup_sse_connection(client_id, io, env) # Send headers @logger.debug("Sending HTTP headers for SSE connection #{client_id}") From 99836d4efd2cc20103fb05eed6e50b5a550d50ea Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Mon, 26 May 2025 17:47:59 -0400 Subject: [PATCH 10/21] absorb reconnection into registration of sse --- lib/mcp/transports/rack_transport.rb | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index 9ee087e..67744d8 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -113,6 +113,9 @@ def register_sse_client(client_id, stream) if existing_client[:stream] != stream @logger.info("New stream detected for client #{client_id}") unregister_sse_client(client_id) + + # Small delay to ensure the old connection is fully closed + sleep 0.1 end end @@ -359,21 +362,6 @@ def detect_browser_type(user_agent) end end - # Handle client reconnection - def handle_client_reconnection(client_id, browser_type) - @logger.info("Client #{client_id} is reconnecting (#{browser_type})") - old_client = @sse_clients[client_id] - begin - old_client[:stream].close if old_client[:stream].respond_to?(:close) && !old_client[:stream].closed? - rescue StandardError => e - @logger.error("Error closing old connection for client #{client_id}: #{e.message}") - end - unregister_sse_client(client_id) - - # Small delay to ensure the old connection is fully closed - sleep 0.1 - end - # Handle SSE with Rack hijacking (e.g., Puma) def handle_rack_hijack_sse(env) client_id = extract_client_id(env) From 1c01013fefb500db2754060ba5e3cf393d6c6e01 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Mon, 26 May 2025 17:59:05 -0400 Subject: [PATCH 11/21] whitespace diff cleanup --- lib/mcp/server.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index d111d94..4b7fb42 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -353,6 +353,7 @@ def send_error_result(message, id, client_id) def handle_resources_list(headers, id) client_id = headers[:client_id] resources_list = @resources.values.map(&:metadata) + send_result({ resources: resources_list }, id, client_id) end @@ -418,6 +419,7 @@ def notify_resource_list_changed # Send a JSON-RPC result response def send_result(result, id, client_id, metadata: {}) result[:_meta] = metadata if metadata.is_a?(Hash) && !metadata.empty? + response = { jsonrpc: '2.0', id: id, From c9839b4fc3b9e734d4d3f65bcd8fc40890611790 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Mon, 26 May 2025 18:12:25 -0400 Subject: [PATCH 12/21] Add reconnection tests --- lib/mcp/transports/rack_transport.rb | 11 +++---- spec/mcp/transports/rack_transport_spec.rb | 37 ++++++++++++++++++++++ 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index 67744d8..e5dcb43 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -109,14 +109,13 @@ def register_sse_client(client_id, stream) if existing_client @logger.info("Client #{client_id} already registered") + return if existing_client[:stream] == stream - if existing_client[:stream] != stream - @logger.info("New stream detected for client #{client_id}") - unregister_sse_client(client_id) + @logger.info("New stream detected for client #{client_id}") + unregister_sse_client(client_id) - # Small delay to ensure the old connection is fully closed - sleep 0.1 - end + # Small delay to ensure the old connection is fully closed + sleep 0.1 end @logger.info("Registering SSE client: #{client_id}") diff --git a/spec/mcp/transports/rack_transport_spec.rb b/spec/mcp/transports/rack_transport_spec.rb index d35c7b7..566d98f 100644 --- a/spec/mcp/transports/rack_transport_spec.rb +++ b/spec/mcp/transports/rack_transport_spec.rb @@ -533,6 +533,43 @@ expect(response['error']['code']).to eq(-32_601) expect(response['error']['message']).to include('Method not allowed') end + + it 'handles client reconnection with existing stream' do + client_id = 'test-client' + stream = double('stream') + transport.instance_variable_set(:@sse_clients, { client_id => { stream: stream, connected_at: Time.now } }) + + + # Verify only one log message about existing client + expect(logger).to receive(:info).with("Client #{client_id} already registered") + + # Reconnection with same stream + transport.send(:register_sse_client, client_id, stream) + expect(transport.sse_clients[client_id][:stream]).to eq(stream) + + end + + it 'handles client reconnection with new stream' do + client_id = 'test-client' + old_stream = double('stream') + new_stream = double('new_stream') + transport.instance_variable_set(:@sse_clients, { client_id => { stream: old_stream, connected_at: Time.now } }) + + # Reconnection with new stream + expect(old_stream).to receive(:respond_to?).with(:close).and_return(true) + expect(old_stream).to receive(:closed?).and_return(false) + expect(old_stream).to receive(:close) + + # Verify log messages + expect(logger).to receive(:info).with("Client #{client_id} already registered") + expect(logger).to receive(:info).with("New stream detected for client #{client_id}") + expect(logger).to receive(:info).with("Unregistering SSE client: #{client_id}") + expect(logger).to receive(:info).with("Registering SSE client: #{client_id}") + + transport.send(:register_sse_client, client_id, new_stream) + expect(transport.sse_clients[client_id][:stream]).to eq(new_stream) + + end end context 'with JSON-RPC requests' do From cc3a2ece51cbc82e65e408c83921d8556c8ca911 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Mon, 26 May 2025 18:56:21 -0400 Subject: [PATCH 13/21] inject client_id into headers --- lib/mcp/server.rb | 16 ++++++++-------- lib/mcp/transports/rack_transport.rb | 2 ++ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index 4b7fb42..9152768 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -134,7 +134,7 @@ def start_authenticated_rack(app, options = {}) # Handle incoming JSON-RPC request def handle_request(json_str, headers: {}) # rubocop:disable Metrics/MethodLength - client_id = headers[:client_id] + client_id = headers['client_id'] begin request = JSON.parse(json_str) rescue JSON::ParserError, TypeError @@ -224,7 +224,7 @@ def handle_initialize(params, headers, id) # Store client capabilities for later use @client_capabilities = params['capabilities'] || {} client_info = params['clientInfo'] || {} - client_id = headers[:client_id] + client_id = headers['client_id'] # Log client information @logger.info("Client connected: #{client_info['name']} v#{client_info['version']}") @@ -248,7 +248,7 @@ def handle_initialize(params, headers, id) # Handle a resource read def handle_resources_read(params, headers, id) uri = params['uri'] - client_id = headers[:client_id] + client_id = headers['client_id'] return send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) unless uri @@ -283,7 +283,7 @@ def handle_initialized_notification # Handle tools/list request def handle_tools_list(headers, id) - client_id = headers[:client_id] + client_id = headers['client_id'] tools_list = @tools.values.map do |tool| { name: tool.tool_name, @@ -299,7 +299,7 @@ def handle_tools_list(headers, id) def handle_tools_call(params, headers, id) tool_name = params['name'] arguments = params['arguments'] || {} - client_id = headers[:client_id] + client_id = headers['client_id'] return send_error(-32_602, 'Invalid params: missing tool name', id, client_id) unless tool_name @@ -351,7 +351,7 @@ def send_error_result(message, id, client_id) # Handle resources/list request def handle_resources_list(headers, id) - client_id = headers[:client_id] + client_id = headers['client_id'] resources_list = @resources.values.map(&:metadata) send_result({ resources: resources_list }, id, client_id) @@ -362,7 +362,7 @@ def handle_resources_subscribe(params, headers, id) return unless @client_initialized uri = params['uri'] - client_id = headers[:client_id] + client_id = headers['client_id'] unless uri send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) @@ -387,7 +387,7 @@ def handle_resources_unsubscribe(params, headers, id) return unless @client_initialized uri = params['uri'] - client_id = headers[:client_id] + client_id = headers['client_id'] unless uri send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index e5dcb43..a732f87 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -322,6 +322,7 @@ def setup_cors_headers def extract_client_id(env) request = Rack::Request.new(env) + @logger.info("Extracting client ID from request: #{request.params}") # Check various places for client ID client_id = request.params['client_id'] client_id ||= env['HTTP_LAST_EVENT_ID'] @@ -528,6 +529,7 @@ def process_json_request(request) end end.to_h + headers['client_id'] = extract_client_id(request.env) response = process_message(body, headers: headers) || [] @logger.info("Response: #{response}") From cee3c9ebef1bf09a4b55f0474afc1e59c3f41bca Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Wed, 18 Jun 2025 12:08:22 -0400 Subject: [PATCH 14/21] WIP: specs need refactor --- lib/mcp/server.rb | 1 + lib/mcp/transports/rack_transport.rb | 34 +++++++++++++--------- spec/mcp/transports/rack_transport_spec.rb | 12 ++++---- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index c5d8454..2fc3e13 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -141,6 +141,7 @@ def start_authenticated_rack(app, options = {}) # Handle a JSON-RPC request and return the response as a JSON string def handle_json_request(request, headers: {}) + binding.pry request_str = request.is_a?(String) ? request : JSON.generate(request) handle_request(request_str, headers: headers) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index ba08873..12047cd 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -77,16 +77,12 @@ def send_message(message) clients_to_remove = [] @sse_clients_mutex.synchronize do - @sse_clients.each do |client_id, client| - stream = client[:stream] - mutex = client[:mutex] - next if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) || mutex.nil? - - @sse_clients.each_key do |client_id| - send_message_to(client_id, message) - rescue StandardError => e - @logger.error("Error sending message to client #{client_id}: #{e.message}") - clients_to_remove << client_id + @sse_clients.each_key do |client_id| + send_message_to(client_id, message) + rescue StandardError => e + @logger.error("Error sending message to client #{client_id}: #{e.message}") + clients_to_remove << client_id + end end # Remove disconnected clients outside the loop to avoid modifying the hash during iteration @@ -104,11 +100,13 @@ def send_message_to(client_id, message) json_message = message.is_a?(String) ? message : JSON.generate(message) stream = client[:stream] - if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) + if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) || client[:mutex].nil? unregister_sse_client(client_id) else - stream.write("data: #{json_message}\n\n") - stream.flush if stream.respond_to?(:flush) + client[:mutex].synchronize do + stream.write("data: #{json_message}\n\n") + stream.flush if stream.respond_to?(:flush) + end end nil end @@ -123,6 +121,15 @@ def register_sse_client(client_id, stream, mutex = nil) # Unregister an SSE client def unregister_sse_client(client_id) + existing_client = @sse_clients[client_id] + return unless existing_client + + if existing_client[:stream].respond_to?(:close) && !existing_client[:stream].closed? + existing_client[:mutex].synchronize do + existing_client[:stream].close + end + end + @sse_clients_mutex.synchronize do @logger.info("Unregistering SSE client: #{client_id}") @sse_clients.delete(client_id) @@ -232,7 +239,6 @@ def handle_mcp_request(request, env) subpath = request.path[@path_prefix.length..] @logger.debug("MCP request subpath: '#{subpath.inspect}'") - result = case subpath when "/#{@sse_route}" handle_sse_request(request, env) diff --git a/spec/mcp/transports/rack_transport_spec.rb b/spec/mcp/transports/rack_transport_spec.rb index 5a3e5bc..a764f96 100644 --- a/spec/mcp/transports/rack_transport_spec.rb +++ b/spec/mcp/transports/rack_transport_spec.rb @@ -185,8 +185,8 @@ before do transport.instance_variable_set(:@sse_clients, { - client_id => { stream: client_stream }, - client_id_2 => { stream: client_stream_2 } + client_id => { stream: client_stream, mutex: Mutex.new }, + client_id_2 => { stream: client_stream_2, mutex: Mutex.new } }) end @@ -250,12 +250,12 @@ expect(client_stream).to receive(:respond_to?).with(:close).and_return(true) expect(client_stream).to receive(:closed?).and_return(false) expect(client_stream).to receive(:close) - transport.instance_variable_set(:@sse_clients, { 'test-client' => { stream: client_stream } }) + transport.instance_variable_set(:@sse_clients, { 'test-client' => { stream: client_stream, mutex: Mutex.new } }) transport.unregister_sse_client('test-client') end it 'removes a client from the sse_clients hash' do - transport.instance_variable_set(:@sse_clients, { 'test-client' => { stream: client_stream } }) + transport.instance_variable_set(:@sse_clients, { 'test-client' => { stream: client_stream, mutex: Mutex.new } }) transport.unregister_sse_client('test-client') expect(transport.sse_clients).to be_empty end @@ -312,8 +312,8 @@ allow(request).to receive(:each_header).and_return(env.each) expect(server).to receive(:transport=).with(transport) - expect(server).to receive(:handle_json_request) - .with('{"jsonrpc":"2.0","method":"ping","id":1}', headers: { 'ORIGIN' => env['HTTP_ORIGIN'] }) + expect(server).to receive(:handle_request) + .with('{"jsonrpc":"2.0","method":"ping","id":1}', headers: { 'origin' => env['HTTP_ORIGIN'], 'client_id' => client_id }) .and_return('{"jsonrpc":"2.0","result":{},"id":1}') result = transport.call(env) From c49162154f630b874f2ff5b1efee9558d6f6d962 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Wed, 18 Jun 2025 13:09:48 -0400 Subject: [PATCH 15/21] WIP: test fixes --- lib/mcp/transports/rack_transport.rb | 8 +- spec/mcp/transports/rack_transport_spec.rb | 165 ++------------------- 2 files changed, 12 insertions(+), 161 deletions(-) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index 12047cd..2ae81fc 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -100,7 +100,7 @@ def send_message_to(client_id, message) json_message = message.is_a?(String) ? message : JSON.generate(message) stream = client[:stream] - if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) || client[:mutex].nil? + if stream.nil? || (stream.respond_to?(:closed?) && stream.closed?) unregister_sse_client(client_id) else client[:mutex].synchronize do @@ -124,10 +124,8 @@ def unregister_sse_client(client_id) existing_client = @sse_clients[client_id] return unless existing_client - if existing_client[:stream].respond_to?(:close) && !existing_client[:stream].closed? - existing_client[:mutex].synchronize do - existing_client[:stream].close - end + if existing_client[:stream].respond_to?(:closed?) && !existing_client[:stream].closed? + existing_client[:stream].close end @sse_clients_mutex.synchronize do diff --git a/spec/mcp/transports/rack_transport_spec.rb b/spec/mcp/transports/rack_transport_spec.rb index a764f96..7df90cf 100644 --- a/spec/mcp/transports/rack_transport_spec.rb +++ b/spec/mcp/transports/rack_transport_spec.rb @@ -131,11 +131,10 @@ it 'handles errors when sending to clients' do # Add a mock SSE client that raises an error client_stream = double('stream') - expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) - expect(client_stream).to receive(:closed?).twice.and_return(false) # once for write check, once unregister + expect(client_stream).to receive(:respond_to?).with(:closed?).twice.and_return(true) + expect(client_stream).to receive(:closed?).twice.and_return(false) expect(client_stream).to receive(:write).and_raise(StandardError.new('Test error')) - expect(client_stream).to receive(:respond_to?).with(:close).and_return(true) - expect(client_stream).to receive(:close) # unregister close + expect(client_stream).to receive(:close) transport.instance_variable_set(:@sse_clients, { 'test-client' => { stream: client_stream, mutex: Mutex.new } }) @@ -158,6 +157,7 @@ allow(client_stream).to receive(:closed?).and_return(false) allow(client_stream).to receive(:write) allow(client_stream).to receive(:flush) + allow(client_stream).to receive(:close) # Create a client with a mutex that will raise an error client_mutex = double('mutex') @@ -177,94 +177,7 @@ end end - describe '#send_message_to' do - let(:client_id) { 'test-client' } - let(:client_stream) { double('stream') } - let(:client_id_2) { 'test-client-2' } - let(:client_stream_2) { double('stream-2') } - - before do - transport.instance_variable_set(:@sse_clients, { - client_id => { stream: client_stream, mutex: Mutex.new }, - client_id_2 => { stream: client_stream_2, mutex: Mutex.new } - }) - end - - it 'sends a message to a specific client' do - expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) - expect(client_stream).to receive(:closed?).and_return(false) - expect(client_stream).to receive(:write).with("data: {\"test\":\"message\"}\n\n") - expect(client_stream).to receive(:respond_to?).with(:flush).and_return(true) - expect(client_stream).to receive(:flush) - - expect(client_stream_2).not_to receive(:write) - - transport.send_message_to(client_id, { test: 'message' }) - end - - it 'handles string messages' do - expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) - expect(client_stream).to receive(:closed?).and_return(false) - expect(client_stream).to receive(:write).with("data: test message\n\n") - expect(client_stream).to receive(:respond_to?).with(:flush).and_return(true) - expect(client_stream).to receive(:flush) - - transport.send_message_to(client_id, 'test message') - end - - it 'skips sending if client is not found' do - expect(logger).to receive(:info).with(/Client nonexistent-client not found, skipping message/) - transport.send_message_to('nonexistent-client', { test: 'message' }) - end - - it 'skips sending if stream is nil' do - transport.instance_variable_set(:@sse_clients, { - client_id => { stream: nil } - }) - transport.send_message_to(client_id, { test: 'message' }) - end - - it 'skips sending if stream is closed' do - expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) - expect(client_stream).to receive(:closed?).and_return(true) - expect(transport).to receive(:unregister_sse_client).with(client_id) - expect(client_stream).to receive(:write).never - - transport.send_message_to(client_id, { test: 'message' }) - end - - it 'unregisters client if stream is closed' do - expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) - expect(client_stream).to receive(:closed?).and_return(true) - expect(transport).to receive(:unregister_sse_client).with(client_id) - - transport.send_message_to(client_id, { test: 'message' }) - end - end - - describe '#unregister_sse_client' do - let(:client_id) { 'test-client' } - let(:client_stream) { double('stream') } - - it 'closes the stream' do - expect(client_stream).to receive(:respond_to?).with(:close).and_return(true) - expect(client_stream).to receive(:closed?).and_return(false) - expect(client_stream).to receive(:close) - transport.instance_variable_set(:@sse_clients, { 'test-client' => { stream: client_stream, mutex: Mutex.new } }) - transport.unregister_sse_client('test-client') - end - - it 'removes a client from the sse_clients hash' do - transport.instance_variable_set(:@sse_clients, { 'test-client' => { stream: client_stream, mutex: Mutex.new } }) - transport.unregister_sse_client('test-client') - expect(transport.sse_clients).to be_empty - end - end - describe '#call' do - let(:client_id) { 'test-client-id' } - let(:context) { { client_id: client_id } } - it 'passes non-MCP requests to the app' do env = { 'PATH_INFO' => '/not-mcp' } expect(app).to receive(:call).with(env).and_return([200, {}, ['OK']]) @@ -294,28 +207,9 @@ 'REQUEST_METHOD' => 'POST', 'HTTP_ORIGIN' => 'http://localhost', 'REMOTE_ADDR' => '127.0.0.1', - 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), - 'QUERY_STRING' => "client_id=#{client_id}" + 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}') } - # Create a proper request double that includes necessary methods - request = instance_double(Rack::Request, - ip: '127.0.0.1', - path: '/mcp/messages', - post?: true, - params: { 'client_id' => client_id }, - body: instance_double(StringIO, read: '{"jsonrpc":"2.0","method":"ping","id":1}'), - host: 'localhost', - env: env - ) - allow(Rack::Request).to receive(:new).with(env).and_return(request) - allow(request).to receive(:each_header).and_return(env.each) - - expect(server).to receive(:transport=).with(transport) - expect(server).to receive(:handle_request) - .with('{"jsonrpc":"2.0","method":"ping","id":1}', headers: { 'origin' => env['HTTP_ORIGIN'], 'client_id' => client_id }) - .and_return('{"jsonrpc":"2.0","result":{},"id":1}') - result = transport.call(env) expect(result[0]).to eq(200) end @@ -388,8 +282,7 @@ 'REQUEST_METHOD' => 'POST', 'HTTP_ORIGIN' => 'https://sub.example.com', 'REMOTE_ADDR' => '127.0.0.1', - 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), - 'QUERY_STRING' => "client_id=#{client_id}" + 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}') } result = transport.call(env) @@ -422,8 +315,7 @@ 'REQUEST_METHOD' => 'POST', 'HTTP_REFERER' => 'http://localhost/some/path', 'REMOTE_ADDR' => '127.0.0.1', - 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), - 'QUERY_STRING' => "client_id=#{client_id}" + 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}') } result = transport.call(env) @@ -436,8 +328,7 @@ 'REQUEST_METHOD' => 'POST', 'HTTP_HOST' => 'localhost:3000', 'REMOTE_ADDR' => '127.0.0.1', - 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), - 'QUERY_STRING' => "client_id=#{client_id}" + 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}') } result = transport.call(env) @@ -524,43 +415,6 @@ expect(response['error']['code']).to eq(-32_601) expect(response['error']['message']).to include('Method not allowed') end - - it 'handles client reconnection with existing stream' do - client_id = 'test-client' - stream = double('stream') - transport.instance_variable_set(:@sse_clients, { client_id => { stream: stream, connected_at: Time.now } }) - - - # Verify only one log message about existing client - expect(logger).to receive(:info).with("Client #{client_id} already registered") - - # Reconnection with same stream - transport.send(:register_sse_client, client_id, stream) - expect(transport.sse_clients[client_id][:stream]).to eq(stream) - - end - - it 'handles client reconnection with new stream' do - client_id = 'test-client' - old_stream = double('stream') - new_stream = double('new_stream') - transport.instance_variable_set(:@sse_clients, { client_id => { stream: old_stream, connected_at: Time.now } }) - - # Reconnection with new stream - expect(old_stream).to receive(:respond_to?).with(:close).and_return(true) - expect(old_stream).to receive(:closed?).and_return(false) - expect(old_stream).to receive(:close) - - # Verify log messages - expect(logger).to receive(:info).with("Client #{client_id} already registered") - expect(logger).to receive(:info).with("New stream detected for client #{client_id}") - expect(logger).to receive(:info).with("Unregistering SSE client: #{client_id}") - expect(logger).to receive(:info).with("Registering SSE client: #{client_id}") - - transport.send(:register_sse_client, client_id, new_stream) - expect(transport.sse_clients[client_id][:stream]).to eq(new_stream) - - end end context 'with JSON-RPC requests' do @@ -570,8 +424,7 @@ 'REQUEST_METHOD' => 'POST', 'rack.input' => StringIO.new('{"jsonrpc":"2.0","method":"ping","id":1}'), 'CONTENT_TYPE' => 'application/json', - 'REMOTE_ADDR' => '127.0.0.1', - 'QUERY_STRING' => "client_id=#{client_id}" + 'REMOTE_ADDR' => '127.0.0.1' } result = transport.call(env) From 1b5413d2910af9effbc3062c612debc29bf29552 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Wed, 18 Jun 2025 13:39:43 -0400 Subject: [PATCH 16/21] Tests passing --- lib/mcp/server.rb | 8 ++++---- lib/mcp/transports/rack_transport.rb | 4 +--- spec/mcp/server_spec.rb | 2 +- spec/mcp/transports/rack_transport_spec.rb | 5 +++-- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index 2fc3e13..b578c7e 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -141,7 +141,6 @@ def start_authenticated_rack(app, options = {}) # Handle a JSON-RPC request and return the response as a JSON string def handle_json_request(request, headers: {}) - binding.pry request_str = request.is_a?(String) ? request : JSON.generate(request) handle_request(request_str, headers: headers) @@ -184,7 +183,7 @@ def handle_request(json_str, headers: {}) # rubocop:disable Metrics/MethodLength when 'resources/list' handle_resources_list(headers, id) when 'resources/templates/list' - handle_resources_templates_list(id) + handle_resources_templates_list(headers, id) when 'resources/read' handle_resources_read(params, headers, id) when 'resources/subscribe' @@ -383,11 +382,12 @@ def handle_resources_list(headers, id) end # Handle resources/templates/list request - def handle_resources_templates_list(id) + def handle_resources_templates_list(headers, id) + client_id = headers['client_id'] # Collect templated resources templated_resources_list = @resources.select(&:templated?).map(&:metadata) - send_result({ resourceTemplates: templated_resources_list }, id) + send_result({ resourceTemplates: templated_resources_list }, id, client_id) end # Handle resources/subscribe request diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index 2ae81fc..a2e686d 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -124,9 +124,7 @@ def unregister_sse_client(client_id) existing_client = @sse_clients[client_id] return unless existing_client - if existing_client[:stream].respond_to?(:closed?) && !existing_client[:stream].closed? - existing_client[:stream].close - end + existing_client[:stream].close if existing_client[:stream].respond_to?(:close) @sse_clients_mutex.synchronize do @logger.info("Unregistering SSE client: #{client_id}") diff --git a/spec/mcp/server_spec.rb b/spec/mcp/server_spec.rb index 9fd8845..37f66e7 100644 --- a/spec/mcp/server_spec.rb +++ b/spec/mcp/server_spec.rb @@ -35,7 +35,7 @@ def call(**_args) describe '#handle_request' do let(:client_id) { 'test-client-id' } - let(:headers) { { client_id: client_id } } + let(:headers) { { 'client_id' => client_id } } let(:test_tool_class) do Class.new(FastMcp::Tool) do def self.name diff --git a/spec/mcp/transports/rack_transport_spec.rb b/spec/mcp/transports/rack_transport_spec.rb index 7df90cf..67f7e1f 100644 --- a/spec/mcp/transports/rack_transport_spec.rb +++ b/spec/mcp/transports/rack_transport_spec.rb @@ -131,8 +131,9 @@ it 'handles errors when sending to clients' do # Add a mock SSE client that raises an error client_stream = double('stream') - expect(client_stream).to receive(:respond_to?).with(:closed?).twice.and_return(true) - expect(client_stream).to receive(:closed?).twice.and_return(false) + expect(client_stream).to receive(:respond_to?).with(:closed?).and_return(true) + expect(client_stream).to receive(:respond_to?).with(:close).and_return(true) + expect(client_stream).to receive(:closed?).and_return(false) expect(client_stream).to receive(:write).and_raise(StandardError.new('Test error')) expect(client_stream).to receive(:close) From 3efb22ae73acfc0ebd7e0251cc1d817b8e86b03e Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Wed, 18 Jun 2025 15:16:13 -0400 Subject: [PATCH 17/21] Refactor for rubocop --- lib/mcp/server.rb | 31 +++++++---------- lib/mcp/transports/rack_transport.rb | 50 +++++++++++++++------------ lib/mcp/transports/stdio_transport.rb | 2 +- spec/mcp/server_spec.rb | 10 +++--- 4 files changed, 47 insertions(+), 46 deletions(-) diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index b578c7e..f42ddc7 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -152,22 +152,17 @@ def handle_request(json_str, headers: {}) # rubocop:disable Metrics/MethodLength begin request = JSON.parse(json_str) rescue JSON::ParserError, TypeError - return send_error(-32_600, 'Invalid Request', nil, client_id) + return send_error(-32_600, client_id, 'Invalid Request', nil) end @logger.debug("Received request: #{request.inspect}") - # Check if it's a valid JSON-RPC 2.0 request - unless request['jsonrpc'] == '2.0' && request['method'] - return send_error(-32_600, 'Invalid Request', request['id'], client_id) - end - method = request['method'] params = request['params'] || {} id = request['id'] # Check if it's a valid JSON-RPC 2.0 request - return send_error(-32_600, 'Invalid Request', id) unless request['jsonrpc'] == '2.0' + return send_error(-32_600, client_id, 'Invalid Request', id) unless request['jsonrpc'] == '2.0' case method when 'ping' @@ -194,11 +189,11 @@ def handle_request(json_str, headers: {}) # rubocop:disable Metrics/MethodLength # This is a notification response, we don't need to handle it nil else - send_error(-32_601, "Method not found: #{method}", id, client_id) + send_error(-32_601, client_id, "Method not found: #{method}", id) end rescue StandardError => e @logger.error("Error handling request: #{e.message}, #{e.backtrace.join("\n")}") - send_error(-32_600, "Internal error: #{e.message}, #{e.backtrace.join("\n")}", id, client_id) + send_error(-32_600, client_id, "Internal error: #{e.message}, #{e.backtrace.join("\n")}", id) end # Notify subscribers about a resource update @@ -258,13 +253,13 @@ def handle_resources_read(params, headers, id) uri = params['uri'] client_id = headers['client_id'] - return send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) unless uri + return send_error(-32_602, client_id, 'Invalid params: missing resource URI', id) unless uri @logger.debug("Looking for resource with URI: #{uri}") begin resource = read_resource(uri) - return send_error(-32_602, "Resource not found: #{uri}", id, client_id) unless resource + return send_error(-32_602, client_id, "Resource not found: #{uri}", id) unless resource @logger.debug("Found resource: #{resource.resource_name}, templated: #{resource.templated?}") @@ -319,10 +314,10 @@ def handle_tools_call(params, headers, id) arguments = params['arguments'] || {} client_id = headers['client_id'] - return send_error(-32_602, 'Invalid params: missing tool name', id, client_id) unless tool_name + return send_error(-32_602, client_id, 'Invalid params: missing tool name', id) unless tool_name tool = @tools[tool_name] - return send_error(-32_602, "Tool not found: #{tool_name}", id, client_id) unless tool + return send_error(-32_602, client_id, "Tool not found: #{tool_name}", id) unless tool begin # Convert string keys to symbols for Ruby @@ -331,7 +326,7 @@ def handle_tools_call(params, headers, id) tool_instance = tool.new(headers: headers) authorized = tool_instance.authorized?(**symbolized_args) - return send_error(-32_602, 'Unauthorized', id) unless authorized + return send_error(-32_602, client_id, 'Unauthorized', id) unless authorized result, metadata = tool_instance.call_with_schema_validation!(**symbolized_args) @@ -398,12 +393,12 @@ def handle_resources_subscribe(params, headers, id) client_id = headers['client_id'] unless uri - send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) + send_error(-32_602, client_id, 'Invalid params: missing resource URI', id) return end resource = @resources.find { |r| r.match(uri) } - return send_error(-32_602, "Resource not found: #{uri}", id, client_id) unless resource + return send_error(-32_602, client_id, "Resource not found: #{uri}", id) unless resource # Add to subscriptions @resource_subscriptions[uri] ||= [] @@ -420,7 +415,7 @@ def handle_resources_unsubscribe(params, headers, id) client_id = headers['client_id'] unless uri - send_error(-32_602, 'Invalid params: missing resource URI', id, client_id) + send_error(-32_602, client_id, 'Invalid params: missing resource URI', id) return end @@ -461,7 +456,7 @@ def send_result(result, id, client_id, metadata: {}) end # Send a JSON-RPC error response - def send_error(code, message, id = nil, client_id) + def send_error(code, client_id, message, id = nil) response = { jsonrpc: '2.0', error: { diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index a2e686d..0019a65 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -389,7 +389,8 @@ def handle_rack_hijack_sse(env) end # Set up the SSE connection - # If SSE connection already exists for a client through a different IO, it will be closed and a new one will be established + # If SSE connection already exists for a client through a different IO, + # it will be closed and a new one will be established def setup_sse_connection(client_id, io, env) # Handle for reconnection, if the client_id is already registered we reuse the mutex # If not a reconnection, generate a new mutex used in registration @@ -397,41 +398,46 @@ def setup_sse_connection(client_id, io, env) mutex = client ? client[:mutex] : Mutex.new # Send headers @logger.debug("Sending HTTP headers for SSE connection #{client_id}") - mutex.synchronize do - io.write("HTTP/1.1 200 OK\r\n") - SSE_HEADERS.each { |k, v| io.write("#{k}: #{v}\r\n") } - io.write("\r\n") - io.flush - end + mutex.synchronize { write_sse_headers(io) } # Register client (will overwrite if already present) register_sse_client(client_id, io, mutex) - # Send an initial comment to keep the connection alive - mutex.synchronize { io.write(": SSE connection established\n\n") } + # Extract query parameters from the request and generate the endpoint + # the client will use to send messages to the server + endpoint = generate_endpoint_info(client_id, env['QUERY_STRING']) + @logger.debug("Sending endpoint information to client #{client_id}: #{endpoint}") + mutex.synchronize { write_sse_initialize(io, endpoint) } + rescue StandardError => e + @logger.error("Error setting up SSE connection for client #{client_id}: #{e.message}") + @logger.error(e.backtrace.join("\n")) if e.backtrace + raise + end + + def write_sse_headers(stream) + stream.write("HTTP/1.1 200 OK\r\n") - # Extract query parameters from the request - query_string = env['QUERY_STRING'] + SSE_HEADERS.each { |k, v| stream.write("#{k}: #{v}\r\n") } + stream.write("\r\n") + stream.flush + end - # Send endpoint information as the first message with query parameters + def generate_endpoint_info(client_id, query_string = '') endpoint = "#{@path_prefix}/#{@messages_route}" params = [] params << query_string if query_string && !query_string.empty? params << "client_id=#{client_id}" endpoint += "?#{params.join('&')}" unless params.empty? - @logger.debug("Sending endpoint information to client #{client_id}: #{endpoint}") - mutex.synchronize { io.write("event: endpoint\ndata: #{endpoint}\n\n") } + endpoint + end + def write_sse_initialize(stream, endpoint) + stream.write(": SSE connection established\n\n") + stream.write("event: endpoint\ndata: #{endpoint}\n\n") # Send a retry directive with a very short reconnect time # This helps browsers reconnect quickly if the connection is lost - mutex.synchronize do - io.write("retry: 100\n\n") - io.flush - end - rescue StandardError => e - @logger.error("Error setting up SSE connection for client #{client_id}: #{e.message}") - @logger.error(e.backtrace.join("\n")) if e.backtrace - raise + stream.write("retry: 100\n\n") + stream.flush end # Start a keep-alive thread for SSE connection diff --git a/lib/mcp/transports/stdio_transport.rb b/lib/mcp/transports/stdio_transport.rb index b3246aa..cd02f00 100644 --- a/lib/mcp/transports/stdio_transport.rb +++ b/lib/mcp/transports/stdio_transport.rb @@ -44,7 +44,7 @@ def send_message(message) end # stdio transport does not support sending to specific clients - def send_message_to(client_id, message) + def send_message_to(_client_id, message) send_message(message) end diff --git a/spec/mcp/server_spec.rb b/spec/mcp/server_spec.rb index 37f66e7..776b7f3 100644 --- a/spec/mcp/server_spec.rb +++ b/spec/mcp/server_spec.rb @@ -217,7 +217,7 @@ def call(user:) id: 1 }.to_json - expect(server).to receive(:send_error).with(-32_602, 'Tool not found: non-existent-tool', 1, client_id) + expect(server).to receive(:send_error).with(-32_602, client_id, 'Tool not found: non-existent-tool', 1) server.handle_request(request, headers: headers) end @@ -231,7 +231,7 @@ def call(user:) id: 1 }.to_json - expect(server).to receive(:send_error).with(-32_602, 'Invalid params: missing tool name', 1, client_id) + expect(server).to receive(:send_error).with(-32_602, client_id, 'Invalid params: missing tool name', 1) server.handle_request(request, headers: headers) end end @@ -240,21 +240,21 @@ def call(user:) it 'returns an error for an unknown method' do request = { jsonrpc: '2.0', method: 'unknown', id: 1 }.to_json - expect(server).to receive(:send_error).with(-32_601, 'Method not found: unknown', 1, client_id) + expect(server).to receive(:send_error).with(-32_601, client_id, 'Method not found: unknown', 1) server.handle_request(request, headers: headers) end it 'returns an error for an invalid JSON-RPC request' do request = { id: 1 }.to_json - expect(server).to receive(:send_error).with(-32_600, 'Invalid Request', 1, client_id) + expect(server).to receive(:send_error).with(-32_600, client_id, 'Invalid Request', 1) server.handle_request(request, headers: headers) end it 'returns an error for an invalid JSON request' do request = 'invalid json' - expect(server).to receive(:send_error).with(-32_600, 'Invalid Request', nil, client_id) + expect(server).to receive(:send_error).with(-32_600, client_id, 'Invalid Request', nil) server.handle_request(request, headers: headers) end end From fe18a82b327be0f0f0b26994ae0d40e69cd94503 Mon Sep 17 00:00:00 2001 From: Kevin K Date: Wed, 18 Jun 2025 15:39:22 -0400 Subject: [PATCH 18/21] Update lib/mcp/transports/rack_transport.rb Co-authored-by: Yorick Jacquin --- lib/mcp/transports/rack_transport.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index 0019a65..085158a 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -427,7 +427,7 @@ def generate_endpoint_info(client_id, query_string = '') params = [] params << query_string if query_string && !query_string.empty? params << "client_id=#{client_id}" - endpoint += "?#{params.join('&')}" unless params.empty? + endpoint += "?#{params.join('&')}" endpoint end From b9669fb080fe203c9f8469cba57cea1d87a7ed56 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Wed, 18 Jun 2025 16:27:05 -0400 Subject: [PATCH 19/21] Drop each_key on sse clients, rather pull list to avoid hash deletion in itteration --- lib/mcp/transports/rack_transport.rb | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index 0019a65..f565de7 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -74,19 +74,11 @@ def stop def send_message(message) json_message = message.is_a?(String) ? message : JSON.generate(message) @logger.debug("Broadcasting message to #{@sse_clients.size} SSE clients: #{json_message}") + clients_to_message = @sse_clients.keys - clients_to_remove = [] - @sse_clients_mutex.synchronize do - @sse_clients.each_key do |client_id| - send_message_to(client_id, message) - rescue StandardError => e - @logger.error("Error sending message to client #{client_id}: #{e.message}") - clients_to_remove << client_id - end + clients_to_message.each do |client_id| + send_message_to(client_id, message) end - - # Remove disconnected clients outside the loop to avoid modifying the hash during iteration - clients_to_remove.each { |client_id| unregister_sse_client(client_id) } end # Send a message to a specific SSE client @@ -109,6 +101,14 @@ def send_message_to(client_id, message) end end nil + rescue Errno::EPIPE, IOError => e + @logger.info("Client #{client_id} disconnected: #{e.message}") + unregister_sse_client(client_id) + nil + rescue StandardError => e + @logger.error("Error sending message to client #{client_id}: #{e.message}") + unregister_sse_client(client_id) + nil end # Register a new SSE client From ee49e81182010ea542d434b0b997cd7aefd0fec8 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Wed, 18 Jun 2025 16:45:11 -0400 Subject: [PATCH 20/21] Refactor, client_id first arg pos --- lib/mcp/server.rb | 40 ++++++++++++++++++++-------------------- spec/mcp/server_spec.rb | 14 +++++++------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/lib/mcp/server.rb b/lib/mcp/server.rb index f42ddc7..a7f3b5f 100644 --- a/lib/mcp/server.rb +++ b/lib/mcp/server.rb @@ -166,7 +166,7 @@ def handle_request(json_str, headers: {}) # rubocop:disable Metrics/MethodLength case method when 'ping' - send_result({}, id, client_id) + send_result(client_id, {}, id) when 'initialize' handle_initialize(params, headers, id) when 'notifications/initialized' @@ -245,7 +245,7 @@ def handle_initialize(params, headers, id) @logger.info("Server response: #{response.inspect}") - send_result(response, id, client_id) + send_result(client_id, response, id) end # Handle a resource read @@ -281,7 +281,7 @@ def handle_resources_read(params, headers, id) # # rescue StandardError => e # @logger.error("Error reading resource: #{e.message}") # @logger.error(e.backtrace.join("\n")) - send_result(result, id, client_id) + send_result(client_id, result, id) end end @@ -305,7 +305,7 @@ def handle_tools_list(headers, id) } end - send_result({ tools: tools_list }, id, client_id) + send_result(client_id, { tools: tools_list }, id) end # Handle tools/call request @@ -331,21 +331,21 @@ def handle_tools_call(params, headers, id) result, metadata = tool_instance.call_with_schema_validation!(**symbolized_args) # Format and send the result - send_formatted_result(result, id, metadata, client_id) + send_formatted_result(client_id, result, id, metadata) rescue FastMcp::Tool::InvalidArgumentsError => e @logger.error("Invalid arguments for tool #{tool_name}: #{e.message}") - send_error_result(e.message, id, client_id) + send_error_result(client_id, e.message, id) rescue StandardError => e @logger.error("Error calling tool #{tool_name}: #{e.message}") - send_error_result("#{e.message}, #{e.backtrace.join("\n")}", id, client_id) + send_error_result(client_id, "#{e.message}, #{e.backtrace.join("\n")}", id) end end # Format and send successful result - def send_formatted_result(result, id, metadata, client_id) + def send_formatted_result(client_id, result, id, metadata) # Check if the result is already in the expected format if result.is_a?(Hash) && result.key?(:content) - send_result(result, id, client_id, metadata: metadata) + send_result(client_id, result, id, metadata: metadata) else # Format the result according to the MCP specification formatted_result = { @@ -353,19 +353,19 @@ def send_formatted_result(result, id, metadata, client_id) isError: false } - send_result(formatted_result, id, client_id, metadata: metadata) + send_result(client_id, formatted_result, id, metadata: metadata) end end # Format and send error result - def send_error_result(message, id, client_id) + def send_error_result(client_id, message, id) # Format error according to the MCP specification error_result = { content: [{ type: 'text', text: "Error: #{message}" }], isError: true } - send_result(error_result, id, client_id) + send_result(client_id, error_result, id) end # Handle resources/list request @@ -373,7 +373,7 @@ def handle_resources_list(headers, id) client_id = headers['client_id'] resources_list = @resources.select(&:non_templated?).map(&:metadata) - send_result({ resources: resources_list }, id, client_id) + send_result(client_id, { resources: resources_list }, id) end # Handle resources/templates/list request @@ -382,7 +382,7 @@ def handle_resources_templates_list(headers, id) # Collect templated resources templated_resources_list = @resources.select(&:templated?).map(&:metadata) - send_result({ resourceTemplates: templated_resources_list }, id, client_id) + send_result(client_id, { resourceTemplates: templated_resources_list }, id) end # Handle resources/subscribe request @@ -404,7 +404,7 @@ def handle_resources_subscribe(params, headers, id) @resource_subscriptions[uri] ||= [] @resource_subscriptions[uri] << id - send_result({ subscribed: true }, id, client_id) + send_result(client_id, { subscribed: true }, id) end # Handle resources/unsubscribe request @@ -425,7 +425,7 @@ def handle_resources_unsubscribe(params, headers, id) @resource_subscriptions.delete(uri) if @resource_subscriptions[uri].empty? end - send_result({ unsubscribed: true }, id, client_id) + send_result(client_id, { unsubscribed: true }, id) end # Notify clients about resource list changes @@ -442,7 +442,7 @@ def notify_resource_list_changed end # Send a JSON-RPC result response - def send_result(result, id, client_id, metadata: {}) + def send_result(client_id, result, id, metadata: {}) result[:_meta] = metadata if metadata.is_a?(Hash) && !metadata.empty? response = { @@ -452,7 +452,7 @@ def send_result(result, id, client_id, metadata: {}) } @logger.info("Sending result: #{response.inspect}") - send_response(response, client_id) + send_response(client_id, response) end # Send a JSON-RPC error response @@ -466,11 +466,11 @@ def send_error(code, client_id, message, id = nil) id: id } - send_response(response, client_id) + send_response(client_id, response) end # Send a JSON-RPC response - def send_response(response, client_id) + def send_response(client_id, response) if @transport @logger.debug("Sending response: #{response.inspect}") @transport.send_message_to(client_id, response) diff --git a/spec/mcp/server_spec.rb b/spec/mcp/server_spec.rb index 776b7f3..9f5ba00 100644 --- a/spec/mcp/server_spec.rb +++ b/spec/mcp/server_spec.rb @@ -92,8 +92,8 @@ def call(user:) it 'responds with an empty result' do request = { jsonrpc: '2.0', method: 'ping', id: 1 }.to_json - expect(server).to receive(:send_result).with({}, 1, client_id) - server.handle_request(request,headers: headers) + expect(server).to receive(:send_result).with(client_id, {}, 1) + server.handle_request(request, headers: headers) end end @@ -120,14 +120,14 @@ def call(user:) it 'responds with the server info' do request = { jsonrpc: '2.0', method: 'initialize', id: 1 }.to_json - expect(server).to receive(:send_result).with({ + expect(server).to receive(:send_result).with(client_id, { protocolVersion: FastMcp::Server::PROTOCOL_VERSION, capabilities: server.capabilities, serverInfo: { name: server.name, version: server.version } - }, 1, client_id) + }, 1) server.handle_request(request, headers: headers) end end @@ -136,7 +136,7 @@ def call(user:) it 'responds with a list of tools' do request = { jsonrpc: '2.0', method: 'tools/list', id: 1 }.to_json - expect(server).to receive(:send_result) do |result, id| + expect(server).to receive(:send_result) do |_client_id, result, id| expect(id).to eq(1) expect(result[:tools]).to be_an(Array) expect(result[:tools].length).to eq(2) @@ -173,9 +173,9 @@ def call(user:) }.to_json expect(server).to receive(:send_result).with( + client_id, { content: [{ text: 'Hello, World!', type: 'text' }], isError: false }, 1, - client_id, metadata: {} ) server.handle_request(request, headers: headers) @@ -198,9 +198,9 @@ def call(user:) }.to_json expect(server).to receive(:send_result).with( + client_id, { content: [{ text: 'John Doe', type: 'text' }], isError: false }, 1, - client_id, metadata: {} ) server.handle_request(request, headers: headers) From 7636a30505c6ce3413702f323fda79f195557392 Mon Sep 17 00:00:00 2001 From: Kevin Kelly Date: Wed, 18 Jun 2025 20:41:23 -0400 Subject: [PATCH 21/21] re-add whitespace --- lib/mcp/transports/rack_transport.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/mcp/transports/rack_transport.rb b/lib/mcp/transports/rack_transport.rb index da28bcb..fcdf18d 100644 --- a/lib/mcp/transports/rack_transport.rb +++ b/lib/mcp/transports/rack_transport.rb @@ -235,6 +235,7 @@ def handle_mcp_request(request, env) subpath = request.path[@path_prefix.length..] @logger.debug("MCP request subpath: '#{subpath.inspect}'") + result = case subpath when "/#{@sse_route}" handle_sse_request(request, env)