Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions apps/mesh/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ import { getToolsByCategory, MANAGEMENT_TOOLS } from "../tools/registry";
import { Env } from "./env";
import { devLogger } from "./utils/dev-logger";
import { streamSSE } from "hono/streaming";
import { SSEEvent, sseHub } from "@/event-bus/sse-hub";
import { type SSEEvent, sseHub } from "../event-bus";
const getHandleOAuthProtectedResourceMetadata = () =>
oAuthProtectedResourceMetadata(auth);
const getHandleOAuthDiscoveryMetadata = () => oAuthDiscoveryMetadata(auth);
Expand Down Expand Up @@ -147,14 +147,20 @@ export interface CreateAppOptions {
export async function createApp(options: CreateAppOptions = {}) {
const database = options.database ?? getDb();

// Stop any existing event bus worker (cleanup during HMR)
// Stop any existing event bus worker and SSE hub (cleanup during HMR)
if (currentEventBus && currentEventBus.isRunning()) {
console.log("[EventBus] Stopping previous worker (HMR cleanup)");
// Fire and forget - don't block app creation
// The stop is mostly synchronous, async part is just UNLISTEN cleanup
Promise.resolve(currentEventBus.stop()).catch((error) => {
console.error("[EventBus] Error stopping previous worker:", error);
});
sseHub.stop().catch((error) => {
console.error(
"[SSEHub] Error stopping previous broadcast (HMR cleanup):",
error,
);
});
}

// Create event bus with a lazy context getter
Expand All @@ -164,6 +170,12 @@ export async function createApp(options: CreateAppOptions = {}) {

if (options.eventBus) {
eventBus = options.eventBus;
sseHub.start().catch((error) => {
console.error(
"[SSEHub] Error starting broadcast (custom eventBus):",
error,
);
});
} else {
// Create notify function that uses the context factory
// This is called by the worker to deliver events to subscribers
Expand Down
36 changes: 33 additions & 3 deletions apps/mesh/src/event-bus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
* - nats: NatsNotifyStrategy + polling safety net
* - postgres: PostgresNotifyStrategy (LISTEN/NOTIFY) + polling safety net
* - polling: PollingStrategy only
* - SSEBroadcastStrategy: Cross-pod SSE fan-out (selected alongside NotifyStrategy)
* - nats: NatsSSEBroadcast (events replicated via NATS pub/sub)
* - default: LocalSSEBroadcast (in-memory only, single process)
*
* Usage:
* ```ts
Expand All @@ -28,9 +31,12 @@ import {
type EventBusConfig,
} from "./interface";
import { NatsNotifyStrategy } from "./nats-notify";
import { NatsSSEBroadcast } from "./nats-sse-broadcast";
import { compose } from "./notify-strategy";
import { PollingStrategy } from "./polling";
import { PostgresNotifyStrategy } from "./postgres-notify";
import { LocalSSEBroadcast } from "./sse-broadcast-strategy";
import { sseHub } from "./sse-hub";

// Re-export types and interfaces
export {
Expand All @@ -52,6 +58,8 @@ export type { EventBus } from "./interface";

export type { NotifyStrategy } from "./notify-strategy";

export { sseHub, type SSEEvent } from "./sse-hub";

/**
* Notify strategy selection.
*
Expand All @@ -68,6 +76,10 @@ export type { NotifyStrategy } from "./notify-strategy";
* In all cases except "polling", a PollingStrategy is composed as a safety
* net to pick up scheduled retries and deliveries that may be missed by the
* primary pubsub mechanism.
*
* SSE broadcast strategy follows the same resolution:
* - NATS available → NatsSSEBroadcast (cross-pod fan-out)
* - Otherwise → LocalSSEBroadcast (in-memory only)
*/
type NotifyStrategyName = "nats" | "postgres" | "polling";

Expand All @@ -90,9 +102,11 @@ function resolveNotifyStrategy(database: MeshDatabase): NotifyStrategyName {
}

/**
* Create an EventBus instance.
* Create an EventBus instance and start the SSE hub with the appropriate
* broadcast strategy.
*
* Notify strategy is selected based on NOTIFY_STRATEGY and NATS_URL env vars.
* Notify strategy and SSE broadcast strategy are selected based on
* NOTIFY_STRATEGY and NATS_URL env vars.
* See resolveNotifyStrategy for full selection logic.
*
* @param database - MeshDatabase instance (discriminated union)
Expand All @@ -109,11 +123,11 @@ export function createEventBus(

const strategyName = resolveNotifyStrategy(database);
const polling = new PollingStrategy(pollIntervalMs);
const natsUrl = process.env.NATS_URL;

let notifyStrategy;
switch (strategyName) {
case "nats": {
const natsUrl = process.env.NATS_URL;
if (!natsUrl) {
throw new Error(
"[EventBus] NOTIFY_STRATEGY=nats requires NATS_URL to be set",
Expand Down Expand Up @@ -154,6 +168,22 @@ export function createEventBus(
notifyStrategy = polling;
}

// Start SSE hub with the appropriate broadcast strategy.
// NATS available → cross-pod fan-out; otherwise → local only.
const sseBroadcast = natsUrl
? new NatsSSEBroadcast({ servers: natsUrl })
: new LocalSSEBroadcast();

sseHub.start(sseBroadcast).catch((err) => {
console.error("[SSEHub] Failed to start broadcast strategy:", err);
});

if (natsUrl) {
console.log("[SSEHub] Using NATS SSE broadcast (cross-pod)");
} else {
console.log("[SSEHub] Using local SSE broadcast (single-pod)");
}

return new EventBusImpl({
storage,
config,
Expand Down
115 changes: 115 additions & 0 deletions apps/mesh/src/event-bus/nats-sse-broadcast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* NATS SSE Broadcast Strategy
*
* Broadcasts SSE events across pods via NATS Core pub/sub.
* Each pod subscribes to a shared subject and calls localEmit
* when it receives a message, so SSE clients on every pod get the event.
*
* Uses a per-instance origin ID to avoid double-emitting on the publisher pod.
*/

import { connect, type NatsConnection, type Subscription } from "nats";
import type { SSEEvent } from "./sse-hub";
import type {
LocalEmitFn,
SSEBroadcastStrategy,
} from "./sse-broadcast-strategy";

const SUBJECT = "mesh.sse.broadcast";

interface NatsSSEMessage {
originId: string;
organizationId: string;
event: SSEEvent;
}

export interface NatsSSEBroadcastOptions {
servers: string | string[];
}

export class NatsSSEBroadcast implements SSEBroadcastStrategy {
private nc: NatsConnection | null = null;
private sub: Subscription | null = null;
private localEmit: LocalEmitFn | null = null;
private startPromise: Promise<void> | null = null;
private readonly originId = crypto.randomUUID();
private readonly encoder = new TextEncoder();

constructor(private readonly options: NatsSSEBroadcastOptions) {}

async start(localEmit: LocalEmitFn): Promise<void> {
if (this.nc) return;
if (this.startPromise) return this.startPromise;

this.startPromise = this._doStart(localEmit);
try {
await this.startPromise;
} finally {
this.startPromise = null;
}
}

private async _doStart(localEmit: LocalEmitFn): Promise<void> {
this.localEmit = localEmit;
this.nc = await connect({ servers: this.options.servers });
this.sub = this.nc.subscribe(SUBJECT);

const decoder = new TextDecoder();

(async () => {
for await (const msg of this.sub!) {
try {
const parsed = JSON.parse(decoder.decode(msg.data));
if (
typeof parsed?.originId !== "string" ||
typeof parsed?.organizationId !== "string" ||
typeof parsed?.event?.id !== "string" ||
typeof parsed?.event?.type !== "string"
) {
continue;
}
if (parsed.originId === this.originId) continue;
this.localEmit?.(parsed.organizationId, parsed.event as SSEEvent);
} catch {
// Malformed message — skip
}
}
})().catch((err) => {
console.error("[NatsSSEBroadcast] Subscription error:", err);
});

console.log("[NatsSSEBroadcast] Started, subscribed to", SUBJECT);
}

broadcast(organizationId: string, event: SSEEvent): void {
// Always emit locally first (fast path for SSE clients on this pod)
this.localEmit?.(organizationId, event);

if (!this.nc) return;

const payload: NatsSSEMessage = {
originId: this.originId,
organizationId,
event,
};

try {
this.nc.publish(SUBJECT, this.encoder.encode(JSON.stringify(payload)));
} catch (err) {
console.warn("[NatsSSEBroadcast] Publish failed (non-critical):", err);
}
}

async stop(): Promise<void> {
this.sub?.unsubscribe();
this.sub = null;
this.localEmit = null;

if (this.nc) {
await this.nc.drain();
this.nc = null;
}

console.log("[NatsSSEBroadcast] Stopped");
}
}
55 changes: 55 additions & 0 deletions apps/mesh/src/event-bus/sse-broadcast-strategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* SSE Broadcast Strategy Interface
*
* Abstraction for how SSE events are broadcast across processes.
* In a single-process deployment, events are emitted locally.
* In multi-pod deployments (K8s), a cross-process strategy (e.g., NATS)
* ensures SSE clients on any pod receive events published from any other pod.
*
* Mirrors the NotifyStrategy pattern used for event bus worker wake-up.
*/

import type { SSEEvent } from "./sse-hub";

/**
* Callback that delivers an event to local SSE listeners.
* Provided by SSEHub when starting the strategy.
*/
export type LocalEmitFn = (organizationId: string, event: SSEEvent) => void;

export interface SSEBroadcastStrategy {
/**
* Start the broadcast strategy.
* @param localEmit - Callback to deliver events to SSE listeners on this process
*/
start(localEmit: LocalEmitFn): Promise<void>;

/**
* Broadcast an event to all processes (including this one).
* The strategy is responsible for calling localEmit on every process.
*/
broadcast(organizationId: string, event: SSEEvent): void;

/** Stop the strategy and release resources. */
stop(): Promise<void>;
}

/**
* Local-only broadcast — events are emitted to the current process only.
* Suitable for single-process deployments and local development.
*/
export class LocalSSEBroadcast implements SSEBroadcastStrategy {
private localEmit: LocalEmitFn | null = null;

async start(localEmit: LocalEmitFn): Promise<void> {
this.localEmit = localEmit;
}

broadcast(organizationId: string, event: SSEEvent): void {
this.localEmit?.(organizationId, event);
}

async stop(): Promise<void> {
this.localEmit = null;
}
}
Loading