From 401685ff5b52c61e7b44a0ced4174860de759a17 Mon Sep 17 00:00:00 2001 From: pedrofrxncx Date: Wed, 25 Feb 2026 23:35:32 -0300 Subject: [PATCH 1/3] feat(event-bus): add SSE broadcast strategy for cross-pod fan-out Introduce SSEBroadcastStrategy interface that decouples event broadcasting from the SSE hub, enabling cross-pod SSE delivery in multi-pod deployments. - SSEBroadcastStrategy interface with start/broadcast/stop lifecycle - LocalSSEBroadcast: in-memory single-process implementation - NatsSSEBroadcast: NATS pub/sub for cross-pod replication with per-instance origin ID to prevent double-emit on publisher pod - Refactor SSEHub to delegate broadcast to the pluggable strategy, separating localEmit (HTTP stream fan-out) from broadcast routing Made-with: Cursor --- apps/mesh/src/event-bus/nats-sse-broadcast.ts | 98 +++++++++++++++++++ .../src/event-bus/sse-broadcast-strategy.ts | 55 +++++++++++ apps/mesh/src/event-bus/sse-hub.ts | 91 ++++++++++++----- 3 files changed, 221 insertions(+), 23 deletions(-) create mode 100644 apps/mesh/src/event-bus/nats-sse-broadcast.ts create mode 100644 apps/mesh/src/event-bus/sse-broadcast-strategy.ts diff --git a/apps/mesh/src/event-bus/nats-sse-broadcast.ts b/apps/mesh/src/event-bus/nats-sse-broadcast.ts new file mode 100644 index 0000000000..7d0330d35e --- /dev/null +++ b/apps/mesh/src/event-bus/nats-sse-broadcast.ts @@ -0,0 +1,98 @@ +/** + * NATS SSE Broadcast Strategy + * + * Broadcasts SSE events across pods via NATS Core pub/sub. + * Each pod subscribes to a shared subject and calls localEmit + * when it receives a message, so SSE clients on every pod get the event. + * + * Uses a per-instance origin ID to avoid double-emitting on the publisher pod. + */ + +import { connect, type NatsConnection, type Subscription } from "nats"; +import type { SSEEvent } from "./sse-hub"; +import type { + LocalEmitFn, + SSEBroadcastStrategy, +} from "./sse-broadcast-strategy"; + +const SUBJECT = "mesh.sse.broadcast"; + +interface NatsSSEMessage { + originId: string; + organizationId: string; + event: SSEEvent; +} + +export interface NatsSSEBroadcastOptions { + servers: string | string[]; +} + +export class NatsSSEBroadcast implements SSEBroadcastStrategy { + private nc: NatsConnection | null = null; + private sub: Subscription | null = null; + private localEmit: LocalEmitFn | null = null; + private readonly originId = crypto.randomUUID(); + + constructor(private readonly options: NatsSSEBroadcastOptions) {} + + async start(localEmit: LocalEmitFn): Promise { + if (this.nc) return; + + this.localEmit = localEmit; + this.nc = await connect({ servers: this.options.servers }); + this.sub = this.nc.subscribe(SUBJECT); + + const decoder = new TextDecoder(); + + (async () => { + for await (const msg of this.sub!) { + try { + const payload: NatsSSEMessage = JSON.parse(decoder.decode(msg.data)); + if (payload.originId === this.originId) continue; + this.localEmit?.(payload.organizationId, payload.event); + } catch { + // Malformed message — skip + } + } + })().catch((err) => { + console.error("[NatsSSEBroadcast] Subscription error:", err); + }); + + console.log("[NatsSSEBroadcast] Started, subscribed to", SUBJECT); + } + + broadcast(organizationId: string, event: SSEEvent): void { + // Always emit locally first (fast path for SSE clients on this pod) + this.localEmit?.(organizationId, event); + + if (!this.nc) return; + + const payload: NatsSSEMessage = { + originId: this.originId, + organizationId, + event, + }; + + try { + this.nc.publish( + SUBJECT, + new TextEncoder().encode(JSON.stringify(payload)), + ); + } catch (err) { + console.warn("[NatsSSEBroadcast] Publish failed (non-critical):", err); + } + } + + async stop(): Promise { + this.sub?.unsubscribe(); + this.sub = null; + this.localEmit = null; + + if (this.nc) { + await this.nc.drain(); + this.nc = null; + } + + console.log("[NatsSSEBroadcast] Stopped"); + } +} diff --git a/apps/mesh/src/event-bus/sse-broadcast-strategy.ts b/apps/mesh/src/event-bus/sse-broadcast-strategy.ts new file mode 100644 index 0000000000..369d41a57d --- /dev/null +++ b/apps/mesh/src/event-bus/sse-broadcast-strategy.ts @@ -0,0 +1,55 @@ +/** + * SSE Broadcast Strategy Interface + * + * Abstraction for how SSE events are broadcast across processes. + * In a single-process deployment, events are emitted locally. + * In multi-pod deployments (K8s), a cross-process strategy (e.g., NATS) + * ensures SSE clients on any pod receive events published from any other pod. + * + * Mirrors the NotifyStrategy pattern used for event bus worker wake-up. + */ + +import type { SSEEvent } from "./sse-hub"; + +/** + * Callback that delivers an event to local SSE listeners. + * Provided by SSEHub when starting the strategy. + */ +export type LocalEmitFn = (organizationId: string, event: SSEEvent) => void; + +export interface SSEBroadcastStrategy { + /** + * Start the broadcast strategy. + * @param localEmit - Callback to deliver events to SSE listeners on this process + */ + start(localEmit: LocalEmitFn): Promise; + + /** + * Broadcast an event to all processes (including this one). + * The strategy is responsible for calling localEmit on every process. + */ + broadcast(organizationId: string, event: SSEEvent): void; + + /** Stop the strategy and release resources. */ + stop(): Promise; +} + +/** + * Local-only broadcast — events are emitted to the current process only. + * Suitable for single-process deployments and local development. + */ +export class LocalSSEBroadcast implements SSEBroadcastStrategy { + private localEmit: LocalEmitFn | null = null; + + async start(localEmit: LocalEmitFn): Promise { + this.localEmit = localEmit; + } + + broadcast(organizationId: string, event: SSEEvent): void { + this.localEmit?.(organizationId, event); + } + + async stop(): Promise { + this.localEmit = null; + } +} diff --git a/apps/mesh/src/event-bus/sse-hub.ts b/apps/mesh/src/event-bus/sse-hub.ts index 0a03c3a876..c2bfe6b4fa 100644 --- a/apps/mesh/src/event-bus/sse-hub.ts +++ b/apps/mesh/src/event-bus/sse-hub.ts @@ -5,14 +5,24 @@ * When events are published through the EventBus, they are also pushed * to all connected SSE clients for the same organization. * + * Cross-pod support: + * The hub delegates broadcasting to an SSEBroadcastStrategy. In single-process + * mode (LocalSSEBroadcast), events stay in-memory. In multi-pod deployments + * (NatsSSEBroadcast), events are replicated to all pods via NATS pub/sub. + * * Design goals: * - Zero buffering: events are written directly to the stream * - Org-scoped: listeners are keyed by organizationId * - Bounded: max connections per org to prevent OOM * - Cleanup on disconnect: listeners removed when HTTP connection closes + * - Pluggable broadcast: strategy handles cross-process replication */ import type { Event } from "../storage/types"; +import { + LocalSSEBroadcast, + type SSEBroadcastStrategy, +} from "./sse-broadcast-strategy"; // ============================================================================ // Types @@ -53,16 +63,45 @@ const MAX_TOTAL_CONNECTIONS = 500; // ============================================================================ /** - * Global SSE hub for fan-out of event bus events to SSE connections. + * SSE hub for fan-out of event bus events to SSE connections. + * + * Holds references to active listener callbacks — no event data. + * Memory usage is proportional to connected SSE clients, not event volume. * - * This is a singleton — there's one hub per process. It holds no event data, - * only references to active listener callbacks. Memory usage is proportional - * to the number of connected SSE clients, not the number of events. + * The broadcast strategy controls whether events reach only this process + * (LocalSSEBroadcast) or all pods (NatsSSEBroadcast). */ class SSEHub { /** Listeners indexed by organizationId for fast lookup */ private listeners = new Map>(); private totalCount = 0; + private strategy: SSEBroadcastStrategy = new LocalSSEBroadcast(); + private started = false; + + /** + * Initialize the hub with a broadcast strategy and start it. + * Must be called before emit() for cross-pod broadcasting to work. + * Safe to call multiple times — subsequent calls are no-ops. + */ + async start(strategy?: SSEBroadcastStrategy): Promise { + if (this.started) return; + + if (strategy) { + this.strategy = strategy; + } + + await this.strategy.start((orgId, event) => this.localEmit(orgId, event)); + this.started = true; + } + + /** + * Stop the broadcast strategy and release resources. + */ + async stop(): Promise { + if (!this.started) return; + await this.strategy.stop(); + this.started = false; + } /** * Register a new SSE listener for an organization. @@ -112,17 +151,38 @@ class SSEHub { } /** - * Fan out an event to all matching SSE listeners for the organization. + * Broadcast an event to all SSE listeners across all pods. * - * This is called from the EventBus publish path. It's synchronous and - * non-blocking — each listener's push callback writes to a ReadableStream. + * Delegates to the configured SSEBroadcastStrategy which handles + * both local delivery and cross-pod replication. */ emit(organizationId: string, event: SSEEvent): void { + this.strategy.broadcast(organizationId, event); + } + + /** + * Get the number of active listeners for an organization. + */ + countForOrg(organizationId: string): number { + return this.listeners.get(organizationId)?.size ?? 0; + } + + /** + * Get total active listener count. + */ + get count(): number { + return this.totalCount; + } + + /** + * Deliver an event to local SSE listeners only (called by the strategy). + * This is the actual fan-out to HTTP streams on this process. + */ + private localEmit(organizationId: string, event: SSEEvent): void { const orgListeners = this.listeners.get(organizationId); if (!orgListeners || orgListeners.size === 0) return; for (const listener of orgListeners.values()) { - // Apply type filter if specified if ( listener.typePatterns && !matchesAnyPattern(event.type, listener.typePatterns) @@ -133,25 +193,10 @@ class SSEHub { try { listener.push(event); } catch { - // Listener's stream is broken — remove it this.remove(organizationId, listener.id); } } } - - /** - * Get the number of active listeners for an organization. - */ - countForOrg(organizationId: string): number { - return this.listeners.get(organizationId)?.size ?? 0; - } - - /** - * Get total active listener count. - */ - get count(): number { - return this.totalCount; - } } // ============================================================================ From d5b937c7fc3e284f43f87a11ca6076d0b8f22f6e Mon Sep 17 00:00:00 2001 From: pedrofrxncx Date: Wed, 25 Feb 2026 23:35:38 -0300 Subject: [PATCH 2/3] feat(event-bus): wire SSE broadcast strategy into event bus lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Select SSE broadcast strategy alongside notify strategy at startup: NATS_URL present → NatsSSEBroadcast, otherwise → LocalSSEBroadcast. - createEventBus now starts sseHub with the resolved strategy - app.ts stops sseHub during HMR cleanup to prevent leaked connections Made-with: Cursor --- apps/mesh/src/api/app.ts | 8 +++++++- apps/mesh/src/event-bus/index.ts | 32 +++++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index 05a620af2f..b321bb3b76 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -147,7 +147,7 @@ export interface CreateAppOptions { export async function createApp(options: CreateAppOptions = {}) { const database = options.database ?? getDb(); - // Stop any existing event bus worker (cleanup during HMR) + // Stop any existing event bus worker and SSE hub (cleanup during HMR) if (currentEventBus && currentEventBus.isRunning()) { console.log("[EventBus] Stopping previous worker (HMR cleanup)"); // Fire and forget - don't block app creation @@ -155,6 +155,12 @@ export async function createApp(options: CreateAppOptions = {}) { Promise.resolve(currentEventBus.stop()).catch((error) => { console.error("[EventBus] Error stopping previous worker:", error); }); + sseHub.stop().catch((error) => { + console.error( + "[SSEHub] Error stopping previous broadcast (HMR cleanup):", + error, + ); + }); } // Create event bus with a lazy context getter diff --git a/apps/mesh/src/event-bus/index.ts b/apps/mesh/src/event-bus/index.ts index 03661c4fa9..cbfa026baa 100644 --- a/apps/mesh/src/event-bus/index.ts +++ b/apps/mesh/src/event-bus/index.ts @@ -11,6 +11,9 @@ * - nats: NatsNotifyStrategy + polling safety net * - postgres: PostgresNotifyStrategy (LISTEN/NOTIFY) + polling safety net * - polling: PollingStrategy only + * - SSEBroadcastStrategy: Cross-pod SSE fan-out (selected alongside NotifyStrategy) + * - nats: NatsSSEBroadcast (events replicated via NATS pub/sub) + * - default: LocalSSEBroadcast (in-memory only, single process) * * Usage: * ```ts @@ -28,9 +31,12 @@ import { type EventBusConfig, } from "./interface"; import { NatsNotifyStrategy } from "./nats-notify"; +import { NatsSSEBroadcast } from "./nats-sse-broadcast"; import { compose } from "./notify-strategy"; import { PollingStrategy } from "./polling"; import { PostgresNotifyStrategy } from "./postgres-notify"; +import { LocalSSEBroadcast } from "./sse-broadcast-strategy"; +import { sseHub } from "./sse-hub"; // Re-export types and interfaces export { @@ -68,6 +74,10 @@ export type { NotifyStrategy } from "./notify-strategy"; * In all cases except "polling", a PollingStrategy is composed as a safety * net to pick up scheduled retries and deliveries that may be missed by the * primary pubsub mechanism. + * + * SSE broadcast strategy follows the same resolution: + * - NATS available → NatsSSEBroadcast (cross-pod fan-out) + * - Otherwise → LocalSSEBroadcast (in-memory only) */ type NotifyStrategyName = "nats" | "postgres" | "polling"; @@ -90,9 +100,11 @@ function resolveNotifyStrategy(database: MeshDatabase): NotifyStrategyName { } /** - * Create an EventBus instance. + * Create an EventBus instance and start the SSE hub with the appropriate + * broadcast strategy. * - * Notify strategy is selected based on NOTIFY_STRATEGY and NATS_URL env vars. + * Notify strategy and SSE broadcast strategy are selected based on + * NOTIFY_STRATEGY and NATS_URL env vars. * See resolveNotifyStrategy for full selection logic. * * @param database - MeshDatabase instance (discriminated union) @@ -109,11 +121,11 @@ export function createEventBus( const strategyName = resolveNotifyStrategy(database); const polling = new PollingStrategy(pollIntervalMs); + const natsUrl = process.env.NATS_URL; let notifyStrategy; switch (strategyName) { case "nats": { - const natsUrl = process.env.NATS_URL; if (!natsUrl) { throw new Error( "[EventBus] NOTIFY_STRATEGY=nats requires NATS_URL to be set", @@ -154,6 +166,20 @@ export function createEventBus( notifyStrategy = polling; } + // Start SSE hub with the appropriate broadcast strategy. + // NATS available → cross-pod fan-out; otherwise → local only. + const sseBroadcast = natsUrl + ? new NatsSSEBroadcast({ servers: natsUrl }) + : new LocalSSEBroadcast(); + + sseHub.start(sseBroadcast).catch((err) => { + console.error("[SSEHub] Failed to start broadcast strategy:", err); + }); + + if (natsUrl) { + console.log("[SSEHub] Using NATS SSE broadcast (cross-pod)"); + } + return new EventBusImpl({ storage, config, From 3d7c4f99361416f066dd624ff99f900c197946c4 Mon Sep 17 00:00:00 2001 From: pedrofrxncx Date: Thu, 26 Feb 2026 00:04:22 -0300 Subject: [PATCH 3/3] feat(event-bus): wire SSE broadcast strategy into event bus lifecycle - Start SSE hub with LocalSSEBroadcast by default when creating EventBus - Add explicit logging for single-pod vs cross-pod SSE broadcast modes - Ensure SSE hub initializes even when custom EventBus is provided - Export SSEEvent and sseHub from event-bus module for public API Made-with: Cursor --- apps/mesh/src/api/app.ts | 8 ++++- apps/mesh/src/event-bus/index.ts | 4 +++ apps/mesh/src/event-bus/nats-sse-broadcast.ts | 31 ++++++++++++++----- apps/mesh/src/event-bus/sse-hub.ts | 10 ++++-- 4 files changed, 43 insertions(+), 10 deletions(-) diff --git a/apps/mesh/src/api/app.ts b/apps/mesh/src/api/app.ts index b321bb3b76..643412516a 100644 --- a/apps/mesh/src/api/app.ts +++ b/apps/mesh/src/api/app.ts @@ -115,7 +115,7 @@ import { getToolsByCategory, MANAGEMENT_TOOLS } from "../tools/registry"; import { Env } from "./env"; import { devLogger } from "./utils/dev-logger"; import { streamSSE } from "hono/streaming"; -import { SSEEvent, sseHub } from "@/event-bus/sse-hub"; +import { type SSEEvent, sseHub } from "../event-bus"; const getHandleOAuthProtectedResourceMetadata = () => oAuthProtectedResourceMetadata(auth); const getHandleOAuthDiscoveryMetadata = () => oAuthDiscoveryMetadata(auth); @@ -170,6 +170,12 @@ export async function createApp(options: CreateAppOptions = {}) { if (options.eventBus) { eventBus = options.eventBus; + sseHub.start().catch((error) => { + console.error( + "[SSEHub] Error starting broadcast (custom eventBus):", + error, + ); + }); } else { // Create notify function that uses the context factory // This is called by the worker to deliver events to subscribers diff --git a/apps/mesh/src/event-bus/index.ts b/apps/mesh/src/event-bus/index.ts index cbfa026baa..d4e1b9dafb 100644 --- a/apps/mesh/src/event-bus/index.ts +++ b/apps/mesh/src/event-bus/index.ts @@ -58,6 +58,8 @@ export type { EventBus } from "./interface"; export type { NotifyStrategy } from "./notify-strategy"; +export { sseHub, type SSEEvent } from "./sse-hub"; + /** * Notify strategy selection. * @@ -178,6 +180,8 @@ export function createEventBus( if (natsUrl) { console.log("[SSEHub] Using NATS SSE broadcast (cross-pod)"); + } else { + console.log("[SSEHub] Using local SSE broadcast (single-pod)"); } return new EventBusImpl({ diff --git a/apps/mesh/src/event-bus/nats-sse-broadcast.ts b/apps/mesh/src/event-bus/nats-sse-broadcast.ts index 7d0330d35e..a4a8bb348d 100644 --- a/apps/mesh/src/event-bus/nats-sse-broadcast.ts +++ b/apps/mesh/src/event-bus/nats-sse-broadcast.ts @@ -31,13 +31,25 @@ export class NatsSSEBroadcast implements SSEBroadcastStrategy { private nc: NatsConnection | null = null; private sub: Subscription | null = null; private localEmit: LocalEmitFn | null = null; + private startPromise: Promise | null = null; private readonly originId = crypto.randomUUID(); + private readonly encoder = new TextEncoder(); constructor(private readonly options: NatsSSEBroadcastOptions) {} async start(localEmit: LocalEmitFn): Promise { if (this.nc) return; + if (this.startPromise) return this.startPromise; + this.startPromise = this._doStart(localEmit); + try { + await this.startPromise; + } finally { + this.startPromise = null; + } + } + + private async _doStart(localEmit: LocalEmitFn): Promise { this.localEmit = localEmit; this.nc = await connect({ servers: this.options.servers }); this.sub = this.nc.subscribe(SUBJECT); @@ -47,9 +59,17 @@ export class NatsSSEBroadcast implements SSEBroadcastStrategy { (async () => { for await (const msg of this.sub!) { try { - const payload: NatsSSEMessage = JSON.parse(decoder.decode(msg.data)); - if (payload.originId === this.originId) continue; - this.localEmit?.(payload.organizationId, payload.event); + const parsed = JSON.parse(decoder.decode(msg.data)); + if ( + typeof parsed?.originId !== "string" || + typeof parsed?.organizationId !== "string" || + typeof parsed?.event?.id !== "string" || + typeof parsed?.event?.type !== "string" + ) { + continue; + } + if (parsed.originId === this.originId) continue; + this.localEmit?.(parsed.organizationId, parsed.event as SSEEvent); } catch { // Malformed message — skip } @@ -74,10 +94,7 @@ export class NatsSSEBroadcast implements SSEBroadcastStrategy { }; try { - this.nc.publish( - SUBJECT, - new TextEncoder().encode(JSON.stringify(payload)), - ); + this.nc.publish(SUBJECT, this.encoder.encode(JSON.stringify(payload))); } catch (err) { console.warn("[NatsSSEBroadcast] Publish failed (non-critical):", err); } diff --git a/apps/mesh/src/event-bus/sse-hub.ts b/apps/mesh/src/event-bus/sse-hub.ts index c2bfe6b4fa..6b6c8c1de5 100644 --- a/apps/mesh/src/event-bus/sse-hub.ts +++ b/apps/mesh/src/event-bus/sse-hub.ts @@ -81,10 +81,16 @@ class SSEHub { /** * Initialize the hub with a broadcast strategy and start it. * Must be called before emit() for cross-pod broadcasting to work. - * Safe to call multiple times — subsequent calls are no-ops. + * + * If already started and a new strategy is provided, stops the current + * strategy first and restarts with the new one (safe for HMR). + * If already started with no new strategy, this is a no-op. */ async start(strategy?: SSEBroadcastStrategy): Promise { - if (this.started) return; + if (this.started) { + if (!strategy) return; + await this.stop(); + } if (strategy) { this.strategy = strategy;