Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apps/mesh/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
"quickjs-emscripten-core": "^0.31.0"
},
"devDependencies": {
"@ai-sdk/provider": "^3.0.0",
"@ai-sdk/react": "^3.0.1",
"@ai-sdk/provider": "^3.0.8",
"@ai-sdk/react": "^3.0.103",
"@better-auth/sso": "1.4.1",
"@daveyplate/better-auth-ui": "^3.2.7",
"@deco/ui": "workspace:*",
Expand Down Expand Up @@ -100,7 +100,7 @@
"@untitledui/icons": "^0.0.19",
"@vercel/nft": "^1.1.1",
"@vitejs/plugin-react": "^5.1.0",
"ai": "^6.0.1",
"ai": "^6.0.101",
"babel-plugin-react-compiler": "^1.0.0",
"better-auth": "1.4.5",
"class-variance-authority": "^0.7.1",
Expand Down
79 changes: 77 additions & 2 deletions apps/mesh/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {
tracingMiddleware,
} from "../observability";
import authRoutes from "./routes/auth";
import decopilotRoutes from "./routes/decopilot";
import { createDecopilotRoutes } from "./routes/decopilot";
import downstreamTokenRoutes from "./routes/downstream-token";
import virtualMcpRoutes from "./routes/virtual-mcp";
import oauthProxyRoutes, {
Expand All @@ -49,10 +49,26 @@ import {
runPluginStartupHooks,
} from "../core/plugin-loader";
import { CredentialVault } from "../encryption/credential-vault";
import {
LocalCancelBroadcast,
type CancelBroadcast,
} from "./routes/decopilot/cancel-broadcast";
import { createNatsConnectionProvider } from "../nats/connection";
import { NatsCancelBroadcast } from "./routes/decopilot/nats-cancel-broadcast";
import {
NoOpStreamBuffer,
type StreamBuffer,
} from "./routes/decopilot/stream-buffer";
import { NatsStreamBuffer } from "./routes/decopilot/nats-stream-buffer";
import { RunRegistry } from "./routes/decopilot/run-registry";
import { SqlThreadStorage } from "../storage/threads";

// Track current event bus instance for cleanup during HMR
let currentEventBus: EventBus | null = null;

// Track decopilot strategy cleanup (abort active runs, stop strategies) during HMR
let currentDecopilotCleanup: (() => void) | null = null;

// ============================================================================
// Deco Store OAuth Helpers
// ============================================================================
Expand Down Expand Up @@ -163,6 +179,21 @@ export async function createApp(options: CreateAppOptions = {}) {
});
}

// Create shared NATS provider when NATS_URL is set (must init before event bus)
const natsUrl = process.env.NATS_URL;
let natsProvider = natsUrl ? createNatsConnectionProvider() : null;
if (natsProvider) {
try {
await natsProvider.init(natsUrl!);
} catch (err) {
console.warn(
"[NATS] Connection failed, falling back to local-only mode:",
err,
);
natsProvider = null;
}
}

// Create event bus with a lazy context getter
// The notify function needs a context, but the context needs the event bus
// We resolve this by having notify create its own system context
Expand All @@ -180,12 +211,51 @@ export async function createApp(options: CreateAppOptions = {}) {
// Create notify function that uses the context factory
// This is called by the worker to deliver events to subscribers
// EventBus uses the full MeshDatabase (includes Pool for PostgreSQL)
eventBus = createEventBus(database);
eventBus = createEventBus(database, undefined, natsProvider);
}

// Track for cleanup during HMR
currentEventBus = eventBus;

// Decopilot strategy cleanup on HMR / shutdown
if (currentDecopilotCleanup) currentDecopilotCleanup();
const threadStorage = new SqlThreadStorage(database.db);

const runRegistry = new RunRegistry();

const cancelBroadcast: CancelBroadcast = natsProvider
? new NatsCancelBroadcast({
getConnection: () => natsProvider!.getConnection(),
})
: new LocalCancelBroadcast();

const streamBuffer: StreamBuffer = natsProvider
? new NatsStreamBuffer({
getConnection: () => natsProvider!.getConnection(),
getJetStream: () => natsProvider!.getJetStream(),
})
: new NoOpStreamBuffer();

cancelBroadcast
.start((threadId) => runRegistry.cancelLocal(threadId))
.catch((err) => {
console.error("[Decopilot] CancelBroadcast start failed:", err);
});
streamBuffer.init().catch((err) => {
console.warn(
"[Decopilot] StreamBuffer init failed, attach/late-join disabled:",
err,
);
});

currentDecopilotCleanup = () => {
runRegistry.stopAll(threadStorage);
runRegistry.dispose();
cancelBroadcast.stop().catch(() => {});
streamBuffer.teardown();
natsProvider?.drain().catch(() => {});
};

const app = new Hono<Env>();

// ============================================================================
Expand Down Expand Up @@ -633,6 +703,11 @@ export async function createApp(options: CreateAppOptions = {}) {
}
});

const decopilotRoutes = createDecopilotRoutes({
cancelBroadcast,
streamBuffer,
runRegistry,
});
app.route("/api", decopilotRoutes);

// OpenAI-compatible LLM API routes
Expand Down
3 changes: 2 additions & 1 deletion apps/mesh/src/api/routes/decopilot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
* The actual implementation lives in ./decopilot/routes.ts
*/

export { default } from "./decopilot/routes";
export { createDecopilotRoutes } from "./decopilot/routes";
export type { DecopilotDeps } from "./decopilot/routes";
export type { StreamRequest } from "./decopilot/schemas";
4 changes: 4 additions & 0 deletions apps/mesh/src/api/routes/decopilot/built-in-tools/subtask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ export function createSubtaskTool(
providerMetadata,
});
},
onAbort: () => {
console.error(`[subtask:${agent_id}] Aborted`);
mcpClient.close().catch(() => {});
},
onError: (error) => {
console.error(`[subtask:${agent_id}] Error`, error);
},
Expand Down
41 changes: 41 additions & 0 deletions apps/mesh/src/api/routes/decopilot/cancel-broadcast.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { describe, it, expect } from "bun:test";
import { LocalCancelBroadcast } from "./cancel-broadcast";

describe("LocalCancelBroadcast", () => {
it("start stores the onCancel callback", async () => {
const broadcast = new LocalCancelBroadcast();
const cancelled: string[] = [];

await broadcast.start((id) => cancelled.push(id));
broadcast.broadcast("thread-1");

expect(cancelled).toEqual(["thread-1"]);
});

it("broadcast invokes callback for each call", async () => {
const broadcast = new LocalCancelBroadcast();
const cancelled: string[] = [];

await broadcast.start((id) => cancelled.push(id));
broadcast.broadcast("a");
broadcast.broadcast("b");

expect(cancelled).toEqual(["a", "b"]);
});

it("stop nulls the callback so broadcast is a no-op", async () => {
const broadcast = new LocalCancelBroadcast();
const cancelled: string[] = [];

await broadcast.start((id) => cancelled.push(id));
await broadcast.stop();
broadcast.broadcast("thread-1");

expect(cancelled).toHaveLength(0);
});

it("broadcast before start is a no-op (no throw)", () => {
const broadcast = new LocalCancelBroadcast();
expect(() => broadcast.broadcast("thread-1")).not.toThrow();
});
});
38 changes: 38 additions & 0 deletions apps/mesh/src/api/routes/decopilot/cancel-broadcast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Cancel Broadcast Interface
*
* Abstraction for how run cancellation is broadcast across pods.
* In single-process mode, cancel is local only.
* In multi-pod deployments, NATS pub/sub propagates cancellation.
*
* Mirrors the SSEBroadcastStrategy pattern from event-bus.
*/

export interface CancelBroadcast {
/** Start listening for cancel broadcasts. When received, call onCancel locally. */
start(onCancel: (threadId: string) => void): Promise<void>;
/** Broadcast a cancellation to all pods (including local). */
broadcast(threadId: string): void;
/** Stop listening and release resources. */
stop(): Promise<void>;
}

/**
* Local-only cancel — cancel only works on the current process.
* Suitable for single-process deployments and when NATS is unavailable.
*/
export class LocalCancelBroadcast implements CancelBroadcast {
private onCancel: ((threadId: string) => void) | null = null;

async start(onCancel: (threadId: string) => void): Promise<void> {
this.onCancel = onCancel;
}

broadcast(threadId: string): void {
this.onCancel?.(threadId);
}

async stop(): Promise<void> {
this.onCancel = null;
}
}
Loading