diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index 05a620af2f..643412516a 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -115,7 +115,7 @@ import { getToolsByCategory, MANAGEMENT_TOOLS } from "../tools/registry"; import { Env } from "./env"; import { devLogger } from "./utils/dev-logger"; import { streamSSE } from "hono/streaming"; -import { SSEEvent, sseHub } from "@/event-bus/sse-hub"; +import { type SSEEvent, sseHub } from "../event-bus"; const getHandleOAuthProtectedResourceMetadata = () => oAuthProtectedResourceMetadata(auth); const getHandleOAuthDiscoveryMetadata = () => oAuthDiscoveryMetadata(auth); @@ -147,7 +147,7 @@ export interface CreateAppOptions { export async function createApp(options: CreateAppOptions = {}) { const database = options.database ?? getDb(); - // Stop any existing event bus worker (cleanup during HMR) + // Stop any existing event bus worker and SSE hub (cleanup during HMR) if (currentEventBus && currentEventBus.isRunning()) { console.log("[EventBus] Stopping previous worker (HMR cleanup)"); // Fire and forget - don't block app creation @@ -155,6 +155,12 @@ export async function createApp(options: CreateAppOptions = {}) { Promise.resolve(currentEventBus.stop()).catch((error) => { console.error("[EventBus] Error stopping previous worker:", error); }); + sseHub.stop().catch((error) => { + console.error( + "[SSEHub] Error stopping previous broadcast (HMR cleanup):", + error, + ); + }); } // Create event bus with a lazy context getter @@ -164,6 +170,12 @@ export async function createApp(options: CreateAppOptions = {}) { if (options.eventBus) { eventBus = options.eventBus; + sseHub.start().catch((error) => { + console.error( + "[SSEHub] Error starting broadcast (custom eventBus):", + error, + ); + }); } else { // Create notify function that uses the context factory // This is called by the worker to deliver events to subscribers diff --git a/apps/mesh/src/event-bus/index.ts b/apps/mesh/src/event-bus/index.ts index 03661c4fa9..d4e1b9dafb 100644 --- a/apps/mesh/src/event-bus/index.ts +++ b/apps/mesh/src/event-bus/index.ts @@ -11,6 +11,9 @@ * - nats: NatsNotifyStrategy + polling safety net * - postgres: PostgresNotifyStrategy (LISTEN/NOTIFY) + polling safety net * - polling: PollingStrategy only + * - SSEBroadcastStrategy: Cross-pod SSE fan-out (selected alongside NotifyStrategy) + * - nats: NatsSSEBroadcast (events replicated via NATS pub/sub) + * - default: LocalSSEBroadcast (in-memory only, single process) * * Usage: * ```ts @@ -28,9 +31,12 @@ import { type EventBusConfig, } from "./interface"; import { NatsNotifyStrategy } from "./nats-notify"; +import { NatsSSEBroadcast } from "./nats-sse-broadcast"; import { compose } from "./notify-strategy"; import { PollingStrategy } from "./polling"; import { PostgresNotifyStrategy } from "./postgres-notify"; +import { LocalSSEBroadcast } from "./sse-broadcast-strategy"; +import { sseHub } from "./sse-hub"; // Re-export types and interfaces export { @@ -52,6 +58,8 @@ export type { EventBus } from "./interface"; export type { NotifyStrategy } from "./notify-strategy"; +export { sseHub, type SSEEvent } from "./sse-hub"; + /** * Notify strategy selection. * @@ -68,6 +76,10 @@ export type { NotifyStrategy } from "./notify-strategy"; * In all cases except "polling", a PollingStrategy is composed as a safety * net to pick up scheduled retries and deliveries that may be missed by the * primary pubsub mechanism. + * + * SSE broadcast strategy follows the same resolution: + * - NATS available → NatsSSEBroadcast (cross-pod fan-out) + * - Otherwise → LocalSSEBroadcast (in-memory only) */ type NotifyStrategyName = "nats" | "postgres" | "polling"; @@ -90,9 +102,11 @@ function resolveNotifyStrategy(database: MeshDatabase): NotifyStrategyName { } /** - * Create an EventBus instance. + * Create an EventBus instance and start the SSE hub with the appropriate + * broadcast strategy. * - * Notify strategy is selected based on NOTIFY_STRATEGY and NATS_URL env vars. + * Notify strategy and SSE broadcast strategy are selected based on + * NOTIFY_STRATEGY and NATS_URL env vars. * See resolveNotifyStrategy for full selection logic. * * @param database - MeshDatabase instance (discriminated union) @@ -109,11 +123,11 @@ export function createEventBus( const strategyName = resolveNotifyStrategy(database); const polling = new PollingStrategy(pollIntervalMs); + const natsUrl = process.env.NATS_URL; let notifyStrategy; switch (strategyName) { case "nats": { - const natsUrl = process.env.NATS_URL; if (!natsUrl) { throw new Error( "[EventBus] NOTIFY_STRATEGY=nats requires NATS_URL to be set", @@ -154,6 +168,22 @@ export function createEventBus( notifyStrategy = polling; } + // Start SSE hub with the appropriate broadcast strategy. + // NATS available → cross-pod fan-out; otherwise → local only. + const sseBroadcast = natsUrl + ? new NatsSSEBroadcast({ servers: natsUrl }) + : new LocalSSEBroadcast(); + + sseHub.start(sseBroadcast).catch((err) => { + console.error("[SSEHub] Failed to start broadcast strategy:", err); + }); + + if (natsUrl) { + console.log("[SSEHub] Using NATS SSE broadcast (cross-pod)"); + } else { + console.log("[SSEHub] Using local SSE broadcast (single-pod)"); + } + return new EventBusImpl({ storage, config, diff --git a/apps/mesh/src/event-bus/nats-sse-broadcast.ts b/apps/mesh/src/event-bus/nats-sse-broadcast.ts new file mode 100644 index 0000000000..a4a8bb348d --- /dev/null +++ b/apps/mesh/src/event-bus/nats-sse-broadcast.ts @@ -0,0 +1,115 @@ +/** + * NATS SSE Broadcast Strategy + * + * Broadcasts SSE events across pods via NATS Core pub/sub. + * Each pod subscribes to a shared subject and calls localEmit + * when it receives a message, so SSE clients on every pod get the event. + * + * Uses a per-instance origin ID to avoid double-emitting on the publisher pod. + */ + +import { connect, type NatsConnection, type Subscription } from "nats"; +import type { SSEEvent } from "./sse-hub"; +import type { + LocalEmitFn, + SSEBroadcastStrategy, +} from "./sse-broadcast-strategy"; + +const SUBJECT = "mesh.sse.broadcast"; + +interface NatsSSEMessage { + originId: string; + organizationId: string; + event: SSEEvent; +} + +export interface NatsSSEBroadcastOptions { + servers: string | string[]; +} + +export class NatsSSEBroadcast implements SSEBroadcastStrategy { + private nc: NatsConnection | null = null; + private sub: Subscription | null = null; + private localEmit: LocalEmitFn | null = null; + private startPromise: Promise | null = null; + private readonly originId = crypto.randomUUID(); + private readonly encoder = new TextEncoder(); + + constructor(private readonly options: NatsSSEBroadcastOptions) {} + + async start(localEmit: LocalEmitFn): Promise { + if (this.nc) return; + if (this.startPromise) return this.startPromise; + + this.startPromise = this._doStart(localEmit); + try { + await this.startPromise; + } finally { + this.startPromise = null; + } + } + + private async _doStart(localEmit: LocalEmitFn): Promise { + this.localEmit = localEmit; + this.nc = await connect({ servers: this.options.servers }); + this.sub = this.nc.subscribe(SUBJECT); + + const decoder = new TextDecoder(); + + (async () => { + for await (const msg of this.sub!) { + try { + const parsed = JSON.parse(decoder.decode(msg.data)); + if ( + typeof parsed?.originId !== "string" || + typeof parsed?.organizationId !== "string" || + typeof parsed?.event?.id !== "string" || + typeof parsed?.event?.type !== "string" + ) { + continue; + } + if (parsed.originId === this.originId) continue; + this.localEmit?.(parsed.organizationId, parsed.event as SSEEvent); + } catch { + // Malformed message — skip + } + } + })().catch((err) => { + console.error("[NatsSSEBroadcast] Subscription error:", err); + }); + + console.log("[NatsSSEBroadcast] Started, subscribed to", SUBJECT); + } + + broadcast(organizationId: string, event: SSEEvent): void { + // Always emit locally first (fast path for SSE clients on this pod) + this.localEmit?.(organizationId, event); + + if (!this.nc) return; + + const payload: NatsSSEMessage = { + originId: this.originId, + organizationId, + event, + }; + + try { + this.nc.publish(SUBJECT, this.encoder.encode(JSON.stringify(payload))); + } catch (err) { + console.warn("[NatsSSEBroadcast] Publish failed (non-critical):", err); + } + } + + async stop(): Promise { + this.sub?.unsubscribe(); + this.sub = null; + this.localEmit = null; + + if (this.nc) { + await this.nc.drain(); + this.nc = null; + } + + console.log("[NatsSSEBroadcast] Stopped"); + } +} diff --git a/apps/mesh/src/event-bus/sse-broadcast-strategy.ts b/apps/mesh/src/event-bus/sse-broadcast-strategy.ts new file mode 100644 index 0000000000..369d41a57d --- /dev/null +++ b/apps/mesh/src/event-bus/sse-broadcast-strategy.ts @@ -0,0 +1,55 @@ +/** + * SSE Broadcast Strategy Interface + * + * Abstraction for how SSE events are broadcast across processes. + * In a single-process deployment, events are emitted locally. + * In multi-pod deployments (K8s), a cross-process strategy (e.g., NATS) + * ensures SSE clients on any pod receive events published from any other pod. + * + * Mirrors the NotifyStrategy pattern used for event bus worker wake-up. + */ + +import type { SSEEvent } from "./sse-hub"; + +/** + * Callback that delivers an event to local SSE listeners. + * Provided by SSEHub when starting the strategy. + */ +export type LocalEmitFn = (organizationId: string, event: SSEEvent) => void; + +export interface SSEBroadcastStrategy { + /** + * Start the broadcast strategy. + * @param localEmit - Callback to deliver events to SSE listeners on this process + */ + start(localEmit: LocalEmitFn): Promise; + + /** + * Broadcast an event to all processes (including this one). + * The strategy is responsible for calling localEmit on every process. + */ + broadcast(organizationId: string, event: SSEEvent): void; + + /** Stop the strategy and release resources. */ + stop(): Promise; +} + +/** + * Local-only broadcast — events are emitted to the current process only. + * Suitable for single-process deployments and local development. + */ +export class LocalSSEBroadcast implements SSEBroadcastStrategy { + private localEmit: LocalEmitFn | null = null; + + async start(localEmit: LocalEmitFn): Promise { + this.localEmit = localEmit; + } + + broadcast(organizationId: string, event: SSEEvent): void { + this.localEmit?.(organizationId, event); + } + + async stop(): Promise { + this.localEmit = null; + } +} diff --git a/apps/mesh/src/event-bus/sse-hub.ts b/apps/mesh/src/event-bus/sse-hub.ts index 0a03c3a876..6b6c8c1de5 100644 --- a/apps/mesh/src/event-bus/sse-hub.ts +++ b/apps/mesh/src/event-bus/sse-hub.ts @@ -5,14 +5,24 @@ * When events are published through the EventBus, they are also pushed * to all connected SSE clients for the same organization. * + * Cross-pod support: + * The hub delegates broadcasting to an SSEBroadcastStrategy. In single-process + * mode (LocalSSEBroadcast), events stay in-memory. In multi-pod deployments + * (NatsSSEBroadcast), events are replicated to all pods via NATS pub/sub. + * * Design goals: * - Zero buffering: events are written directly to the stream * - Org-scoped: listeners are keyed by organizationId * - Bounded: max connections per org to prevent OOM * - Cleanup on disconnect: listeners removed when HTTP connection closes + * - Pluggable broadcast: strategy handles cross-process replication */ import type { Event } from "../storage/types"; +import { + LocalSSEBroadcast, + type SSEBroadcastStrategy, +} from "./sse-broadcast-strategy"; // ============================================================================ // Types @@ -53,16 +63,51 @@ const MAX_TOTAL_CONNECTIONS = 500; // ============================================================================ /** - * Global SSE hub for fan-out of event bus events to SSE connections. + * SSE hub for fan-out of event bus events to SSE connections. + * + * Holds references to active listener callbacks — no event data. + * Memory usage is proportional to connected SSE clients, not event volume. * - * This is a singleton — there's one hub per process. It holds no event data, - * only references to active listener callbacks. Memory usage is proportional - * to the number of connected SSE clients, not the number of events. + * The broadcast strategy controls whether events reach only this process + * (LocalSSEBroadcast) or all pods (NatsSSEBroadcast). */ class SSEHub { /** Listeners indexed by organizationId for fast lookup */ private listeners = new Map>(); private totalCount = 0; + private strategy: SSEBroadcastStrategy = new LocalSSEBroadcast(); + private started = false; + + /** + * Initialize the hub with a broadcast strategy and start it. + * Must be called before emit() for cross-pod broadcasting to work. + * + * If already started and a new strategy is provided, stops the current + * strategy first and restarts with the new one (safe for HMR). + * If already started with no new strategy, this is a no-op. + */ + async start(strategy?: SSEBroadcastStrategy): Promise { + if (this.started) { + if (!strategy) return; + await this.stop(); + } + + if (strategy) { + this.strategy = strategy; + } + + await this.strategy.start((orgId, event) => this.localEmit(orgId, event)); + this.started = true; + } + + /** + * Stop the broadcast strategy and release resources. + */ + async stop(): Promise { + if (!this.started) return; + await this.strategy.stop(); + this.started = false; + } /** * Register a new SSE listener for an organization. @@ -112,17 +157,38 @@ class SSEHub { } /** - * Fan out an event to all matching SSE listeners for the organization. + * Broadcast an event to all SSE listeners across all pods. * - * This is called from the EventBus publish path. It's synchronous and - * non-blocking — each listener's push callback writes to a ReadableStream. + * Delegates to the configured SSEBroadcastStrategy which handles + * both local delivery and cross-pod replication. */ emit(organizationId: string, event: SSEEvent): void { + this.strategy.broadcast(organizationId, event); + } + + /** + * Get the number of active listeners for an organization. + */ + countForOrg(organizationId: string): number { + return this.listeners.get(organizationId)?.size ?? 0; + } + + /** + * Get total active listener count. + */ + get count(): number { + return this.totalCount; + } + + /** + * Deliver an event to local SSE listeners only (called by the strategy). + * This is the actual fan-out to HTTP streams on this process. + */ + private localEmit(organizationId: string, event: SSEEvent): void { const orgListeners = this.listeners.get(organizationId); if (!orgListeners || orgListeners.size === 0) return; for (const listener of orgListeners.values()) { - // Apply type filter if specified if ( listener.typePatterns && !matchesAnyPattern(event.type, listener.typePatterns) @@ -133,25 +199,10 @@ class SSEHub { try { listener.push(event); } catch { - // Listener's stream is broken — remove it this.remove(organizationId, listener.id); } } } - - /** - * Get the number of active listeners for an organization. - */ - countForOrg(organizationId: string): number { - return this.listeners.get(organizationId)?.size ?? 0; - } - - /** - * Get total active listener count. - */ - get count(): number { - return this.totalCount; - } } // ============================================================================