Skip to content

feat(event-bus): wire SSE broadcast strategy into event bus lifecycle#2503

Merged
pedrofrxncx merged 3 commits intomainfrom
feat/sse-broadcast-strategy
Feb 26, 2026
Merged

feat(event-bus): wire SSE broadcast strategy into event bus lifecycle#2503
pedrofrxncx merged 3 commits intomainfrom
feat/sse-broadcast-strategy

Conversation

@pedrofrxncx
Copy link
Collaborator

@pedrofrxncx pedrofrxncx commented Feb 26, 2026

What is this contribution about?

This PR wires the SSE (Server-Sent Events) broadcast strategy into the event bus lifecycle, ensuring that:

  • SSE hub is properly initialized with the appropriate broadcast strategy when the event bus is created
  • Cross-pod SSE event replication works correctly with NATS
  • Single-pod deployments use local in-memory broadcasting
  • SSE hub can be hot-swapped for HMR support and test isolation

Key changes:

  1. Event Bus Lifecycle Integration: Start SSE hub as part of createEventBus() and when custom EventBus is provided
  2. Flexible Strategy Swapping: Updated SSEHub.start() to support replacing strategies without memory leaks
  3. Race Condition Prevention: Added start promise memoization in NatsSSEBroadcast to prevent concurrent initialization
  4. Input Validation: Strengthened message validation in NATS subscriber to handle malformed payloads safely
  5. Resource Efficiency: Reuse TextEncoder instance in NatsSSEBroadcast

Screenshots/Demonstration

N/A

How to Test

  1. Start the development server with bun run dev
  2. Verify SSE hub initialization logs appear in console:
    • Single pod: [SSEHub] Using local SSE broadcast (single-pod)
    • Multi-pod with NATS: [SSEHub] Using NATS SSE broadcast (cross-pod)
  3. Create event subscriptions and publish events to verify delivery
  4. In multi-pod setup, verify events replicate across pods via NATS
  5. Run tests: bun test (ignoring pre-existing Playwright e2e infrastructure issues)

Migration Notes

N/A - No database migrations or configuration changes required.

Review Checklist

  • PR title is clear and descriptive
  • Changes are tested and working
  • No breaking changes
  • Code formatted with Biome (bun run fmt)
  • Linting passes (bun run lint)
  • Type checking passes (bun run check:ci)
  • Unused exports removed (bun run knip)

Summary by cubic

Wires a pluggable SSE broadcast strategy into the event bus so SSE events fan out across pods via NATS when available, with a local fallback. The SSE hub now starts/stops with the bus and supports hot-swapping for HMR.

  • New Features

    • Start SSE hub in createEventBus with NatsSSEBroadcast when NATS_URL is set; otherwise use LocalSSEBroadcast.
    • Initialize SSE hub even when a custom EventBus is provided.
    • Stop SSE hub during HMR cleanup to avoid leaked connections.
    • SSEHub now uses a pluggable SSEBroadcastStrategy; sseHub and SSEEvent are exported for consumers.
    • Logs clearly indicate single-pod vs cross-pod mode.
  • Bug Fixes

    • Prevent concurrent NATS initialization by memoizing start.
    • Validate and safely ignore malformed NATS messages.

Written for commit 3d7c4f9. Summary will update on new commits.

Introduce SSEBroadcastStrategy interface that decouples event
broadcasting from the SSE hub, enabling cross-pod SSE delivery in
multi-pod deployments.

- SSEBroadcastStrategy interface with start/broadcast/stop lifecycle
- LocalSSEBroadcast: in-memory single-process implementation
- NatsSSEBroadcast: NATS pub/sub for cross-pod replication with
  per-instance origin ID to prevent double-emit on publisher pod
- Refactor SSEHub to delegate broadcast to the pluggable strategy,
  separating localEmit (HTTP stream fan-out) from broadcast routing

Made-with: Cursor
Select SSE broadcast strategy alongside notify strategy at startup:
NATS_URL present → NatsSSEBroadcast, otherwise → LocalSSEBroadcast.

- createEventBus now starts sseHub with the resolved strategy
- app.ts stops sseHub during HMR cleanup to prevent leaked connections

Made-with: Cursor
- Start SSE hub with LocalSSEBroadcast by default when creating EventBus
- Add explicit logging for single-pod vs cross-pod SSE broadcast modes
- Ensure SSE hub initializes even when custom EventBus is provided
- Export SSEEvent and sseHub from event-bus module for public API

Made-with: Cursor
@github-actions
Copy link
Contributor

🧪 Benchmark

Should we run the Virtual MCP strategy benchmark for this PR?

React with 👍 to run the benchmark.

Reaction Action
👍 Run quick benchmark (10 & 128 tools)

Benchmark will run on the next push after you react.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 26, 2026

Release Options

Should a new version be published when this PR is merged?

React with an emoji to vote on the release type:

Reaction Type Next Version
👍 Prerelease 2.116.1-alpha.1
🎉 Patch 2.116.1
❤️ Minor 2.117.0
🚀 Major 3.0.0

Current version: 2.116.0

Deployment

  • Deploy to production (triggers ArgoCD sync after Docker image is published)

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 5 files

@pedrofrxncx pedrofrxncx merged commit fb80eb1 into main Feb 26, 2026
8 checks passed
@pedrofrxncx pedrofrxncx deleted the feat/sse-broadcast-strategy branch February 26, 2026 03:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant