Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ help:

# Start local development with Docker Compose in foreground
dev:
@if [ ! -f .env ]; then \
echo "❌ .env file not found!"; \
echo ""; \
echo "Please run setup first:"; \
echo " make setup"; \
echo ""; \
exit 1; \
@if [ -f .env ]; then \
echo "ℹ️ Loading environment overrides from .env"; \
else \
echo "ℹ️ No .env file detected; relying on current environment."; \
fi
@echo "🚀 Starting local development mode with Docker Compose..."
@echo " This will:"
Expand Down
114 changes: 79 additions & 35 deletions packages/gateway/src/gateway/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#!/usr/bin/env bun

import type { IMessageQueue } from "@peerbot/core";
import type { IMessageQueue, WorkerTokenData } from "@peerbot/core";
import { createLogger, verifyWorkerToken } from "@peerbot/core";
import type { Request, Response } from "express";
import { WorkerConnectionManager } from "./connection-manager";
import { WorkerJobRouter } from "./job-router";
import { McpConfigService } from "../mcp/config-service";

const logger = createLogger("worker-gateway");

Expand All @@ -17,11 +18,13 @@ export class WorkerGateway {
private connectionManager: WorkerConnectionManager;
private jobRouter: WorkerJobRouter;
private queue: IMessageQueue;
private mcpConfigService?: McpConfigService;

constructor(queue: IMessageQueue) {
constructor(queue: IMessageQueue, mcpConfigService?: McpConfigService) {
this.queue = queue;
this.connectionManager = new WorkerConnectionManager();
this.jobRouter = new WorkerJobRouter(queue, this.connectionManager);
this.mcpConfigService = mcpConfigService;
}

/**
Expand All @@ -38,33 +41,25 @@ export class WorkerGateway {
this.handleWorkerResponse(req, res)
);

if (this.mcpConfigService) {
app.get("/worker/mcp/config", (req: Request, res: Response) =>
this.handleMcpConfigRequest(req, res)
);
}

logger.info("Worker gateway routes registered");
}

/**
* Handle SSE connection from worker
*/
private async handleStreamConnection(req: Request, res: Response) {
const authHeader = req.headers.authorization;

// Verify worker token
if (!authHeader || !authHeader.startsWith("Bearer ")) {
res
.status(401)
.json({ error: "Missing or invalid authorization header" });
return;
}

const token = authHeader.substring(7);
const tokenData = verifyWorkerToken(token);

if (!tokenData) {
logger.warn("Invalid token");
res.status(401).json({ error: "Invalid token" });
const auth = this.authenticateWorker(req, res);
if (!auth) {
return;
}

const { deploymentName, userId, threadId } = tokenData;
const { deploymentName, userId, threadId } = auth.tokenData;

// Setup SSE
res.setHeader("Content-Type", "text/event-stream");
Expand Down Expand Up @@ -93,25 +88,12 @@ export class WorkerGateway {
* Handle HTTP response from worker
*/
private async handleWorkerResponse(req: Request, res: Response) {
const authHeader = req.headers.authorization;

// Verify worker token
if (!authHeader || !authHeader.startsWith("Bearer ")) {
res
.status(401)
.json({ error: "Missing or invalid authorization header" });
return;
}

const token = authHeader.substring(7);
const tokenData = verifyWorkerToken(token);

if (!tokenData) {
res.status(401).json({ error: "Invalid token" });
const auth = this.authenticateWorker(req, res);
if (!auth) {
return;
}

const { deploymentName } = tokenData;
const { deploymentName } = auth.tokenData;

// Update connection activity
this.connectionManager.touchConnection(deploymentName);
Expand All @@ -134,6 +116,68 @@ export class WorkerGateway {
}
}

private async handleMcpConfigRequest(req: Request, res: Response) {
if (!this.mcpConfigService) {
res.status(503).json({ error: "mcp_config_unavailable" });
return;
}

const auth = this.authenticateWorker(req, res);
if (!auth) {
return;
}

try {
const baseUrl = this.getRequestBaseUrl(req);
const config = await this.mcpConfigService.getWorkerConfig({
baseUrl,
workerToken: auth.token,
});
res.json(config);
} catch (error) {
logger.error("Failed to generate MCP config", { error });
res.status(500).json({ error: "mcp_config_error" });
}
}

private authenticateWorker(
req: Request,
res: Response
): { tokenData: WorkerTokenData; token: string } | null {
const authHeader = req.headers.authorization;

if (!authHeader || !authHeader.startsWith("Bearer ")) {
res
.status(401)
.json({ error: "Missing or invalid authorization header" });
return null;
}

const token = authHeader.substring(7);
const tokenData = verifyWorkerToken(token);

if (!tokenData) {
logger.warn("Invalid token");
res.status(401).json({ error: "Invalid token" });
return null;
}

return { tokenData, token };
}

private getRequestBaseUrl(req: Request): string {
const forwardedProto = req.headers["x-forwarded-proto"];
const protocolCandidate = Array.isArray(forwardedProto)
? forwardedProto[0]
: forwardedProto?.split(",")[0];
const protocol = (protocolCandidate || req.protocol || "http").trim();
const host = req.get("host");
if (host) {
return `${protocol}://${host}`;
}
return process.env.PEERBOT_PUBLIC_GATEWAY_URL || `${protocol}://localhost:8080`;
}

/**
* Get active worker connections
*/
Expand Down
91 changes: 73 additions & 18 deletions packages/gateway/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import { initSentry } from "@peerbot/core";
initSentry();

import http from "node:http";
import { existsSync } from "node:fs";
import path from "node:path";
import { Command } from "commander";
import { moduleRegistry } from "@peerbot/core";
import { createLogger, type OrchestratorConfig } from "@peerbot/core";
import { LogLevel } from "@slack/bolt";
Expand All @@ -19,6 +22,7 @@ import { Orchestrator } from "./orchestration";
import type { AnthropicProxy } from "./proxy/anthropic-proxy";
import { SlackDispatcher } from "./slack";
import type { DispatcherConfig } from "./types";
import type { McpProxy } from "./mcp/proxy";

let healthServer: http.Server | null = null;

Expand All @@ -27,7 +31,8 @@ let healthServer: http.Server | null = null;
*/
function setupHealthEndpoints(
anthropicProxy?: AnthropicProxy,
workerGateway?: WorkerGateway
workerGateway?: WorkerGateway,
mcpProxy?: McpProxy
) {
if (healthServer) return;

Expand Down Expand Up @@ -63,6 +68,11 @@ function setupHealthEndpoints(
logger.info("✅ Worker gateway routes enabled at :8080/worker/*");
}

if (mcpProxy) {
mcpProxy.setupRoutes(proxyApp);
logger.info("✅ MCP proxy routes enabled at :8080/mcp/*");
}

// Register module endpoints
moduleRegistry.registerEndpoints(proxyApp);
logger.info("✅ Module endpoints registered");
Expand All @@ -82,13 +92,38 @@ function setupHealthEndpoints(
/**
* Main entry point
*/
async function main() {
type StartOptions = {
env?: string;
};

class MissingRequiredEnvError extends Error {
constructor(public envName: string | string[]) {
const message = Array.isArray(envName)
? `Missing one of the required environment variables: ${envName.join(", ")}`
: `Missing required environment variable: ${envName}`;
super(message);
this.name = "MissingRequiredEnvError";
}
}

async function startGateway({ env }: StartOptions = {}) {
try {
// Load environment variables from .env file (searches up from cwd)
// In Docker/K8s, env vars are injected by container runtime
if (process.env.NODE_ENV !== "production") {
dotenvConfig();
const envProvided = Boolean(env);
const envPath = envProvided
? path.resolve(process.cwd(), env!)
: path.resolve(process.cwd(), ".env");

if (existsSync(envPath)) {
dotenvConfig({ path: envPath });
logger.debug(`Loaded environment variables from ${envPath}`);
} else if (envProvided) {
logger.warn(`Specified env file ${envPath} was not found; continuing without it.`);
} else {
logger.debug("No .env file found; relying on process environment.");
}
}

logger.info("🚀 Starting Claude Code Slack Dispatcher");

// Get bot token from environment
Expand All @@ -101,9 +136,19 @@ async function main() {
signingSecret: `${process.env.SLACK_SIGNING_SECRET?.substring(0, 10)}...`,
});

const connectionString =
process.env.QUEUE_URL || process.env.DATABASE_URL;
if (!connectionString) {
throw new MissingRequiredEnvError(["QUEUE_URL", "DATABASE_URL"]);
}

if (!botToken) {
throw new MissingRequiredEnvError("SLACK_BOT_TOKEN");
}

const config: DispatcherConfig = {
slack: {
token: botToken!,
token: botToken,
appToken: process.env.SLACK_APP_TOKEN,
signingSecret: process.env.SLACK_SIGNING_SECRET,
socketMode: process.env.SLACK_HTTP_MODE !== "true",
Expand All @@ -125,7 +170,7 @@ async function main() {
logLevel: (process.env.LOG_LEVEL as any) || LogLevel.INFO,
// Queue configuration (required)
queues: {
connectionString: process.env.QUEUE_URL || process.env.DATABASE_URL!,
connectionString,
directMessage: process.env.QUEUE_DIRECT_MESSAGE || "direct_message",
messageQueue: process.env.QUEUE_MESSAGE_QUEUE || "message_queue",
retryLimit: parseInt(process.env.PGBOSS_RETRY_LIMIT || "3", 10),
Expand All @@ -149,13 +194,6 @@ async function main() {
});

// Validate required configuration
if (!config.slack.token) {
throw new Error("SLACK_BOT_TOKEN is required");
}
if (!config.queues.connectionString) {
throw new Error("QUEUE_URL is required");
}

// Create orchestrator configuration
const orchestratorConfig: OrchestratorConfig = {
queues: {
Expand Down Expand Up @@ -218,7 +256,8 @@ async function main() {
// Setup health endpoints on port 8080
setupHealthEndpoints(
dispatcher.getAnthropicProxy(),
dispatcher.getWorkerGateway()
dispatcher.getWorkerGateway(),
dispatcher.getMcpProxy()
);

// Setup graceful shutdown for orchestrator
Expand All @@ -237,12 +276,28 @@ async function main() {
logger.info("Health check:", JSON.stringify(status, null, 2));
});
} catch (error) {
logger.error("❌ Failed to start Slack Dispatcher:", error);
if (error instanceof MissingRequiredEnvError) {
logger.error(error.message);
} else {
logger.error("❌ Failed to start Slack Dispatcher:", error);
}
process.exit(1);
}
}

// Start the application
main();
const program = new Command();

program
.name("peerbot-gateway")
.description("Peerbot gateway service")
.option("--env <path>", "Path to environment file")
.action(async (options: StartOptions) => {
await startGateway(options);
});

program.parseAsync(process.argv).catch((error) => {
logger.error("❌ Failed to start Slack Dispatcher:", error);
process.exit(1);
});

export type { DispatcherConfig } from "./types";
Loading
Loading