Skip to content

Background#2508

Merged
pedrofrxncx merged 12 commits intomainfrom
background
Feb 27, 2026
Merged

Background#2508
pedrofrxncx merged 12 commits intomainfrom
background

Conversation

@pedrofrxncx
Copy link
Collaborator

@pedrofrxncx pedrofrxncx commented Feb 26, 2026

What is this contribution about?

Describe your changes and why they're needed.

Screenshots/Demonstration

Add screenshots or a Loom video if your changes affect the UI.

How to Test

Provide step-by-step instructions for reviewers to test your changes:

  1. Step one
  2. Step two
  3. Expected outcome

Migration Notes

If this PR requires database migrations, configuration changes, or other setup steps, document them here. Remove this section if not applicable.

Review Checklist

  • PR title is clear and descriptive
  • Changes are tested and working
  • Documentation is updated (if needed)
  • No breaking changes

Summary by cubic

Adds distributed Decopilot run control with cross-pod cancel and late-join stream replay over NATS/JetStream via a shared NATS connection used by Event Bus and Decopilot. Runs continue in the background after disconnect; the UI and SDK show live status, step counts, and cancel controls with local fallbacks when NATS is unavailable.

  • New Features

    • Shared NATS connection provider used by Event Bus (SSE/notify), cancel broadcast, and JetStream relay.
    • New endpoints: POST /:org/decopilot/:threadId/cancel and GET /:org/decopilot/:threadId/attach; re‑attach to in‑progress runs.
    • Cross‑pod cancel via NATS (LocalCancelBroadcast fallback).
    • JetStream buffer for /attach replay with per‑thread limits and explicit purge.
    • In‑memory RunRegistry; runs survive client disconnects with stale‑run reaping.
    • SDK exports typed Decopilot SSE events and thread status/display types (includes “expired”); useDecopilotEvents hook powers live updates (tasks show step count; chat input shows cancel).
    • Shared SSE subscription utility reused by workflow and chat.
    • Conversation: denyPendingApprovals now patches only when needed and returns the original array when unchanged; added tests.
    • Chat: reduced re‑renders by separating stable vs. streaming values; moved tiptapDoc to local state; added stable hook for agent/thread UI; always persists streamed messages into the thread cache; notifications fire whenever enabled.
    • Tasks: updated to use the stable chat context for consistency.
    • Thread status resolution simplified (stop -> completed); canonical ThreadStatus moved to SDK and reused server/UI.
    • Internal: routes moved to a DI factory; validateThreadOwnership checks; subtask tool closes MCP on abort; Event Bus uses the shared NATS provider.
    • Dependencies: bumped ai SDK packages (@ai-sdk/provider, @ai-sdk/react, ai) to v6.0.101.
  • Migration

    • Configure NATS server URL(s) and enable JetStream for cross‑pod cancel and replay; otherwise behavior remains local‑only.
    • No database migrations.

Written for commit 342f791. Summary will update on new commits.

@github-actions
Copy link
Contributor

🧪 Benchmark

Should we run the Virtual MCP strategy benchmark for this PR?

React with 👍 to run the benchmark.

Reaction Action
👍 Run quick benchmark (10 & 128 tools)

Benchmark will run on the next push after you react.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 26, 2026

Release Options

Should a new version be published when this PR is merged?

React with an emoji to vote on the release type:

Reaction Type Next Version
👍 Prerelease 2.119.1-alpha.1
🎉 Patch 2.119.1
❤️ Minor 2.120.0
🚀 Major 3.0.0

Current version: 2.119.0

Deployment

  • Deploy to production (triggers ArgoCD sync after Docker image is published)

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 issues found across 33 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts">

<violation number="1" location="apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts:119">
P2: The abort event listener added on `abortSignal` is never removed after the stream completes normally via `flush()`. This keeps the `publishDone` closure (and its captured `js`, `encoder`, `subj`) alive until the signal is GC'd. Consider removing the listener in `flush()`, or use `{ once: true }` combined with manual removal.</violation>
</file>

<file name="apps/mesh/src/web/components/chat/input.tsx">

<violation number="1" location="apps/mesh/src/web/components/chat/input.tsx:304">
P2: Avoid calling setTiptapDocLocal during render when the thread changes; move the reset into a useEffect tied to activeThreadId to prevent render-phase state updates.</violation>
</file>

<file name="apps/mesh/src/api/routes/decopilot/nats-cancel-broadcast.ts">

<violation number="1" location="apps/mesh/src/api/routes/decopilot/nats-cancel-broadcast.ts:30">
P2: start() returns before assigning onCancel when getConnection() is null, so local cancellations won’t fire if NATS isn’t available. Set onCancel before the early return so local cancel still works even without NATS.</violation>
</file>

<file name="apps/mesh/src/api/routes/decopilot/routes.ts">

<violation number="1" location="apps/mesh/src/api/routes/decopilot/routes.ts:544">
P2: `requestMessage` is re-saved (with new `created_at`/`updated_at` timestamps) in `onFinish` and `onStepFinish`, even though it was already persisted before streaming started. This overwrites the original creation timestamp and performs redundant writes. Consider only saving `responseMessage` in the subsequent calls, or skipping `requestMessage` if it was already persisted.</violation>

<violation number="2" location="apps/mesh/src/api/routes/decopilot/routes.ts:569">
P2: The `else if (memory)` branch in the catch block is unreachable dead code. `failThread` is always assigned immediately after `memory = mem` (with only a non-throwing function definition between them), so whenever `memory` is truthy, `failThread` is also truthy and the `if (failThread)` branch executes instead. This duplicates failure-handling logic and creates a risk of the two copies diverging.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 3 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="apps/mesh/src/api/routes/decopilot/routes.ts">

<violation number="1" location="apps/mesh/src/api/routes/decopilot/routes.ts:500">
P2: The finish handler no longer retries saving the request message. Because saveMessagesToThread swallows errors, a transient failure before streaming can leave the thread missing the user message while the response is saved. Consider re-including requestMessage here to retry persistence.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Upgrade @ai-sdk/provider, @ai-sdk/react, and ai packages to latest
minor versions for bug fixes and streaming improvements.

Made-with: Cursor
Define typed CloudEvent schemas for decopilot real-time events:
- step (tool calls, text deltas, reasoning)
- finish (run completed/failed/cancelled)
- thread-status (running/idle/failed)

These types are consumed by the frontend SSE hooks and produced
by the decopilot stream routes.

Made-with: Cursor
Move NATS connection lifecycle into a dedicated NatsConnectionProvider
so that event-bus, cancel-broadcast, and stream-buffer can share a
single connection instead of each managing their own.

Event bus factories now accept an optional NatsConnectionProvider
rather than creating connections internally.

Made-with: Cursor
… and run tracking

Introduce three strategy interfaces for horizontal scaling:

- CancelBroadcast: propagate run cancellation across pods
  (LocalCancelBroadcast for single-process, NatsCancelBroadcast for multi-pod)
- StreamBuffer: replay AI stream chunks for late-joining clients
  (NoOpStreamBuffer for single-process, NatsStreamBuffer via JetStream)
- RunRegistry: track active runs per-pod with abort controller lifecycle

Each abstraction has a local fallback that works without NATS,
making the system progressively enhanceable.

Made-with: Cursor
… support

Major restructure of the decopilot routes module:

- Convert default export to createDecopilotRoutes() factory that
  accepts CancelBroadcast, StreamBuffer, and RunRegistry via DI
- Runs now continue in the background after SSE disconnect, enabling
  tool calls to complete even when the client navigates away
- Add cancel endpoint (POST /:org/decopilot/:threadId/cancel) that
  broadcasts cancellation across pods
- Add attach endpoint (GET /:org/decopilot/:threadId/attach) for
  reconnecting to an in-progress run's stream
- Emit typed SSE events (step/finish/thread-status) via sseHub
- Optimize denyPendingApprovals to only scan the last assistant message
- Extract validateThreadOwnership helper for thread-scoped endpoints
- Wire NATS provider, strategies, and HMR cleanup in app.ts

Made-with: Cursor
Split chat frontend into focused modules:

- createSseSubscription: generic SSE → callback wiring (reused by
  workflow SSE and decopilot events)
- useDecopilotEvents: subscribes to step/finish/thread-status SSE
  events and updates chat state in real-time
- ChatState: lightweight zustand-like store for transient UI state
  (isRunning, current thread status) separate from server cache

Chat context no longer manages streaming lifecycle directly — it
delegates to the background run + SSE event model. The input component
reacts to isRunning to show cancel affordance.

Made-with: Cursor
@pedrofrxncx
Copy link
Collaborator Author

@cubic-ai

@cubic-dev-ai
Copy link
Contributor

cubic-dev-ai bot commented Feb 26, 2026

@cubic-ai

@pedrofrxncx I have started the AI code review. It will take a few minutes to complete.

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 issues found across 40 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="apps/mesh/src/web/hooks/create-sse-subscription.ts">

<violation number="1" location="apps/mesh/src/web/hooks/create-sse-subscription.ts:65">
P2: Make the unsubscribe function idempotent; otherwise multiple calls can decrement refCount too far and close the shared EventSource while other subscribers are still active.</violation>
</file>

<file name="apps/mesh/src/web/components/chat/context.tsx">

<violation number="1" location="apps/mesh/src/web/components/chat/context.tsx:742">
P2: `resumeFailCountRef` is never reset when the active thread changes. If 3 resume failures accumulate on one thread, switching to a different in-progress thread will be unable to resume its stream. Reset the counter when the active thread changes.</violation>
</file>

<file name="apps/mesh/src/event-bus/nats-sse-broadcast.ts">

<violation number="1" location="apps/mesh/src/event-bus/nats-sse-broadcast.ts:40">
P2: start() now returns before assigning localEmit when no NATS connection is available, so local SSE broadcasts are silently dropped in the NATS-unavailable path. Set localEmit before the NATS availability check so local delivery still works even without a connection.</violation>
</file>

<file name="apps/mesh/src/web/components/chat/input.tsx">

<violation number="1" location="apps/mesh/src/web/components/chat/input.tsx:304">
P2: Avoid setting state during render when the thread changes. Move the reset into a useEffect keyed on activeThreadId to prevent render-time state updates and potential render loops.</violation>
</file>

<file name="apps/mesh/src/api/routes/decopilot/routes.ts">

<violation number="1" location="apps/mesh/src/api/routes/decopilot/routes.ts:530">
P1: Bug: `onStepFinish` is not a valid parameter of `createUIMessageStream` — it will be silently ignored. According to the AI SDK docs, `onStepFinish` is only supported on `streamText`/`generateText`. This means step counting, SSE step events, and periodic intermediate saves will never execute.

Move `onStepFinish` to the `streamText()` call (which already supports it) to restore step tracking.</violation>
</file>

<file name="apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts">

<violation number="1" location="apps/mesh/src/api/routes/decopilot/nats-stream-buffer.ts:179">
P2: Timer leak: each `pull()` iteration creates a `setTimeout` (30 s) via `Promise.race` that is never cleared when `iter.next()` wins the race. For a replay of many messages, this accumulates thousands of lingering timers. Store the timer ID and `clearTimeout` it after the race resolves.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

- Added a timer to ensure proper cleanup in NatsStreamBuffer during pull operations.
- Enhanced abort signal handling in createDecopilotRoutes to prevent unnecessary updates.
- Updated ChatProvider to persist partial messages in the thread cache, preventing UI inconsistencies during message source switches.
- Refactored SSE subscription cleanup logic to prevent multiple unsubscriptions.

These changes enhance the reliability and performance of the chat and streaming functionalities.
- Refactored denyPendingApprovals to handle multiple assistant messages and return the same reference when no changes are needed.
- Added unit tests to verify the behavior of denyPendingApprovals across various scenarios, ensuring correct state updates for tool calls.
- Updated Chat context to improve stability and reduce unnecessary re-renders by separating stable and streaming values.

These changes improve the reliability of message handling in the chat system.
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 10 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="apps/mesh/src/web/components/chat/context.tsx">

<violation number="1" location="apps/mesh/src/web/components/chat/context.tsx:816">
P2: Avoid mutating refs during render; move the thread-switch reset into an effect so concurrent renders can’t leave the resume guards in an inconsistent state.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

- Replaced the import of useChatStable from the chat component to the context module, enhancing the organization of chat-related hooks.
- This change aims to streamline the chat functionality and improve overall stability in the task management interface.
- Updated ChatProvider to always persist streamed messages into the thread cache, preventing UI inconsistencies when switching message sources.
- Refined notification logic to trigger only when the finish reason is "stop", improving user experience during chat interactions.

These changes improve the reliability of message handling and reduce unnecessary UI flashes.
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 1 file (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="apps/mesh/src/web/components/chat/context.tsx">

<violation number="1" location="apps/mesh/src/web/components/chat/context.tsx:673">
P2: This new `finishReason === "stop"` guard suppresses notifications for normal finishes. Per prior guidance, notifications should only be suppressed when `isAbort`, `isDisconnect`, or `isError` is true; those cases already return earlier. Remove the finishReason gating so notifications fire for all successful finishes.

(Based on your team's feedback about not suppressing notifications based on finishReason.) [FEEDBACK_USED]</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

- Modified the notification handling in ChatProvider to trigger notifications whenever enabled, regardless of finish reason. This change simplifies the notification logic and ensures users are consistently informed during chat interactions.

This update enhances the user experience by providing timely notifications.
@pedrofrxncx pedrofrxncx merged commit aac8174 into main Feb 27, 2026
8 checks passed
@pedrofrxncx pedrofrxncx deleted the background branch February 27, 2026 00:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant