Skip to content
Merged
5 changes: 3 additions & 2 deletions apps/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@
"format": "prettier --write ."
},
"dependencies": {
"@nao/shared": "*",
"@ai-sdk/anthropic": "^3.0.15",
"@ai-sdk/google": "^3.0.16",
"@ai-sdk/mistral": "^3.0.13",
"@ai-sdk/openai": "^3.0.1",
"@fastify/formbody": "^8.0.2",
"@fastify/static": "^8.1.0",
"@nao/shared": "*",
"@slack/web-api": "^7.13.0",
"@trpc/server": "^11.8.1",
"@types/nodemailer": "^7.0.9",
Expand All @@ -51,13 +51,14 @@
"fastify-plugin": "^5.1.0",
"fastify-raw-body": "^5.0.0",
"fastify-type-provider-zod": "^6.1.0",
"mcporter": "^0.7.3",
"minimatch": "^10.1.1",
"nodemailer": "^7.0.13",
"pg": "^8.16.3",
"postgres": "^3.4.8",
"posthog-node": "^5.21.2",
"react": "^19.2.4",
"react-dom": "^19.2.4",
"posthog-node": "^5.21.2",
"superjson": "^2.2.6",
"zod": "^4.2.1"
},
Expand Down
7 changes: 7 additions & 0 deletions apps/backend/src/agents/tools/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { mcpService } from '../../services/mcp.service';
import displayChart from './display-chart';
import executeSql from './execute-sql';
import grep from './grep';
Expand All @@ -15,3 +16,9 @@ export const tools = {
search,
suggest_follow_ups: suggestFollowUps,
};

export const getTools = () => {
const mcpTools = mcpService.getMcpTools();

return { ...tools, ...mcpTools };
};
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EmailData } from '../../types/email';
import { EmailButton } from './emailButton';
import { EmailLayout } from './emailLayout';
import { WarningBox } from './warningBox';
import { EmailButton } from './email-button';
import { EmailLayout } from './email-layout';
import { WarningBox } from './warning-box';

export function ResetPasswordEmail({ userName, temporaryPassword, loginUrl, projectName }: EmailData) {
return (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EmailData } from '../../types/email';
import { EmailButton } from './emailButton';
import { EmailLayout } from './emailLayout';
import { WarningBox } from './warningBox';
import { EmailButton } from './email-button';
import { EmailLayout } from './email-layout';
import { WarningBox } from './warning-box';

export function UserAddedToProjectEmail({ userName, projectName, loginUrl, to, temporaryPassword }: EmailData) {
const isNewUser = !!temporaryPassword;
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const envSchema = z.object({

NAO_DEFAULT_PROJECT_PATH: z.string().optional(),

MCP_JSON_FILE_PATH: z.string().optional(),

POSTHOG_KEY: z.string().optional(),
POSTHOG_HOST: z.url({ message: 'POSTHOG_HOST must be a valid URL' }).optional(),
POSTHOG_DISABLED: z
Expand Down
6 changes: 6 additions & 0 deletions apps/backend/src/routes/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import { createUIMessageStreamResponse } from 'ai';
import { z } from 'zod/v4';

import type { App } from '../app';
import { env } from '../env';
import { authMiddleware } from '../middleware/auth';
import * as chatQueries from '../queries/chat.queries';
import { agentService } from '../services/agent.service';
import { mcpService } from '../services/mcp.service';
import { posthog, PostHogEvent } from '../services/posthog.service';
import { UIMessage } from '../types/chat';
import { llmProviderSchema } from '../types/llm';
Expand Down Expand Up @@ -67,6 +69,10 @@ export const chatRoutes = async (app: App) => {
return reply.status(403).send({ error: `You are not authorized to access this chat.` });
}

if (env.MCP_JSON_FILE_PATH) {
await mcpService.initializeMcpState();
}

const agent = await agentService.create({ ...chat, userId, projectId }, abortController, modelSelection);

posthog.capture(userId, PostHogEvent.MessageSent, {
Expand Down
9 changes: 4 additions & 5 deletions apps/backend/src/services/agent.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {

import { getInstructions } from '../agents/prompt';
import { CACHE_1H, CACHE_5M, createProviderModel } from '../agents/providers';
import { tools } from '../agents/tools';
import { getTools } from '../agents/tools';
import * as chatQueries from '../queries/chat.queries';
import * as llmConfigQueries from '../queries/project-llm-config.queries';
import { TokenCost, TokenUsage, UIChat, UIMessage } from '../types/chat';
Expand Down Expand Up @@ -131,7 +131,7 @@ export class AgentService {
}

class AgentManager {
private readonly _agent: ToolLoopAgent<never, typeof tools, never>;
private readonly _agent: ToolLoopAgent<never, ReturnType<typeof getTools>, never>;

constructor(
readonly chat: AgentChat,
Expand All @@ -142,7 +142,7 @@ class AgentManager {
) {
this._agent = new ToolLoopAgent({
...modelConfig,
tools,
tools: getTools(),
// On step 1+: cache user message (stable) + current step's last message (loop leaf)
prepareStep: ({ messages }) => {
return { messages: this._addCache(messages) };
Expand All @@ -158,8 +158,7 @@ class AgentManager {
},
): ReadableStream {
let error: unknown = undefined;
let result: StreamTextResult<typeof tools, never>;

let result: StreamTextResult<ReturnType<typeof getTools>, never>;
return createUIMessageStream<UIMessage>({
generateId: () => crypto.randomUUID(),
execute: async ({ writer }) => {
Expand Down
4 changes: 2 additions & 2 deletions apps/backend/src/services/email.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import type { Transporter } from 'nodemailer';
import nodemailer from 'nodemailer';
import { renderToString } from 'react-dom/server';

import { ResetPasswordEmail } from '../components/email/resetPasswordEmail';
import { UserAddedToProjectEmail } from '../components/email/userAddedToProjectEmail';
import { ResetPasswordEmail } from '../components/email/reset-password-email';
import { UserAddedToProjectEmail } from '../components/email/user-added-to-project-email';
import type { CreatedEmailData, EmailData, SendEmailParams } from '../types/email';

class EmailService {
Expand Down
231 changes: 231 additions & 0 deletions apps/backend/src/services/mcp.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
import type { Tool } from '@ai-sdk/provider-utils';
import { debounce } from '@nao/shared/utils';
import { jsonSchema, type JSONSchema7 } from 'ai';
import { readFileSync, watch } from 'fs';
import { callOnce, createRuntime, type Runtime, ServerDefinition, ServerToolInfo } from 'mcporter';

import { env } from '../env';
import { mcpJsonSchema, McpServerConfig, McpServerState } from '../types/mcp';
import { prefixToolName, removePrefixToolName, sanitizeTools } from '../utils/tools';
import { replaceEnvVars } from '../utils/utils';

export class McpService {
private _mcpJsonFilePath: string;
private _mcpServers: Record<string, McpServerConfig>;
private _fileWatcher: ReturnType<typeof watch> | null = null;
private _debouncedReconnect: () => void;
private _initialized = false;
private _mcpTools: Record<string, Tool> = {};
private _runtime: Runtime | null = null;
private _failedConnections: Record<string, string> = {};
private _toolsToServer: Map<string, string> = new Map();
public cachedMcpState: Record<string, McpServerState> = {};

constructor() {
this._mcpJsonFilePath = env.MCP_JSON_FILE_PATH || '';
this._mcpServers = {};

this._debouncedReconnect = debounce(async () => {
await this.loadMcpState();
}, 2000);
this._setupFileWatcher();
}

public async initializeMcpState(): Promise<void> {
if (this._initialized) {
return;
}

await this.loadMcpState();
this._initialized = true;
}

public async loadMcpState(): Promise<void> {
try {
await this._loadMcpServerFromFile();

await this._connectAllServers();

await this._cacheMcpState();
} catch (error) {
console.error('[mcp] Failed to cache MCP state:', error);
throw error;
}
}

public getMcpTools(): Record<string, Tool> {
const sanitizedMcpTools = Object.fromEntries(
Object.entries(this._mcpTools).map(([name, tool]) => {
const inputSchema = tool.inputSchema;

// If it's an AI SDK schema wrapper with jsonSchema getter
if (inputSchema && typeof inputSchema === 'object' && 'jsonSchema' in inputSchema) {
const originalJsonSchema = inputSchema.jsonSchema;
return [
name,
{
...tool,
inputSchema: {
...inputSchema,
jsonSchema: sanitizeTools(originalJsonSchema),
},
} as Tool,
];
}

// Otherwise, sanitize the schema directly
return [
name,
{
...tool,
inputSchema: sanitizeTools(inputSchema),
} as Tool,
];
}),
);
return sanitizedMcpTools;
}

private async _loadMcpServerFromFile(): Promise<void> {
if (!this._mcpJsonFilePath) {
this._mcpServers = {};
return;
}

try {
const fileContent = readFileSync(this._mcpJsonFilePath, 'utf8');
const resolvedContent = replaceEnvVars(fileContent);
const content = mcpJsonSchema.parse(JSON.parse(resolvedContent));
this._mcpServers = content.mcpServers;
} catch {
console.error(
`[mcp] Failed to read or parse MCP config file at ${this._mcpJsonFilePath}. Using empty configuration.`,
);
this._mcpServers = {};
}
}

private async _connectAllServers(): Promise<void> {
this._mcpTools = {};
this._runtime = await createRuntime();

const connectionPromises = Object.entries(this._mcpServers).map(async ([serverName, serverConfig]) => {
try {
if (!this._runtime) {
throw new Error('Runtime not initialized');
}
const definition = this._convertToServerDefinition(serverName, serverConfig);
this._runtime.registerDefinition(definition, { overwrite: true });
await this._listTools(serverName);
return { serverName, success: true };
} catch (error) {
console.error(`[mcp] Failed to connect to ${serverName}:`, error);
this._failedConnections[serverName] = (error as Error).message;
}
});

await Promise.all(connectionPromises);
}

// Convert MCP server config to MCPorter server definition
private _convertToServerDefinition(name: string, config: McpServerConfig): ServerDefinition {
if (config.type === 'http') {
return {
name,
command: {
kind: 'http',
url: config.url!,
},
};
}

return {
name,
command: {
kind: 'stdio',
command: config.command || '',
args: config.args || [],
cwd: process.cwd(),
},
env: config.env,
};
}

private async _listTools(serverName: string): Promise<void> {
if (!this._runtime) {
throw new Error('Runtime not initialized');
}

const tools = await this._runtime.listTools(serverName, {
includeSchema: true,
});

await this.cacheMcpTools(tools, serverName);
}

private async cacheMcpTools(tools: ServerToolInfo[], serverName: string): Promise<void> {
for (const tool of tools) {
const toolName = tool.name.startsWith(serverName) ? tool.name : prefixToolName(serverName, tool.name);
this._mcpTools[toolName] = {
description: tool.description,
inputSchema: jsonSchema(tool.inputSchema as JSONSchema7),
execute: async (toolArgs: Record<string, unknown>) => {
return await this._callTool(toolName, toolArgs);
},
};
this._toolsToServer.set(toolName, serverName);
}
}

private async _callTool(toolName: string, toolArgs: Record<string, unknown>): Promise<unknown> {
const serverName = this._toolsToServer.get(toolName);
if (!serverName) {
throw new Error(`Tool ${toolName} not found in any server`);
}

const result = await callOnce({
server: serverName,
toolName: removePrefixToolName(toolName),
args: toolArgs,
});

return result;
}

private async _cacheMcpState(): Promise<void> {
this.cachedMcpState = {};

for (const serverName of Object.keys(this._mcpServers)) {
const serverTools = Object.entries(this._mcpTools)
.filter(([toolName]) => this._toolsToServer.get(toolName) === serverName)
.map(([toolName, tool]) => ({
name: toolName,
description: tool.description,
input_schema: tool.inputSchema,
}));

this.cachedMcpState[serverName] = {
tools: serverTools,
error: this._failedConnections[serverName],
};
}
}

private _setupFileWatcher(): void {
if (!this._mcpJsonFilePath) {
return;
}

try {
this._fileWatcher = watch(this._mcpJsonFilePath, (eventType) => {
if (eventType === 'change') {
this._debouncedReconnect();
}
});
} catch (error) {
console.error('[mcp] Failed to setup file watcher:', error);
}
}
}

export const mcpService = new McpService();
13 changes: 13 additions & 0 deletions apps/backend/src/trpc/mcp.routes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { mcpService } from '../services/mcp.service';
import { adminProtectedProcedure, protectedProcedure, router } from './trpc';

export const mcpRoutes = router({
getState: protectedProcedure.query(() => {
return mcpService.cachedMcpState;
}),

reconnect: adminProtectedProcedure.mutation(async () => {
await mcpService.loadMcpState();
return mcpService.cachedMcpState;
}),
});
Loading
Loading