Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,49 @@ The `outbox.New` constructor accepts several optional configuration functions:

You can provide your own store or message broker by implementing the `Store` and `MessageBroker` interfaces found in [outbox.go](outbox.go).

## Testing

The `testing` package provides synchronous implementations of outbox interfaces for deterministic, race-condition-free testing. Instead of processing messages asynchronously with background workers, these implementations deliver messages synchronously when they're created.

### Benefits

- **Deterministic execution** – No async race conditions or timing issues
- **Immediate processing** – Events handled synchronously, no waiting required
- **Full stack traces** – See the complete call chain in test failures
- **No infrastructure** – Tests run without NATS or background workers
- **Simpler debugging** – Step through event handling in debugger

### Usage Example

```go
import (
"github.com/nats-io/nats.go/jetstream"
"github.com/nrfta/go-outbox"
"github.com/nrfta/go-outbox/testing"
)

// Setup synchronous testing infrastructure
syncJS := testing.NewSyncJetStream()
syncBroker := testing.NewSyncBroker(syncJS)
syncStore := testing.NewSyncStore(syncBroker)

// Create outbox with sync components
ob, err := outbox.New(syncStore, syncBroker)
if err != nil {
panic(err)
}

// Override JetStream in your DI container
// Events will now be processed synchronously
```

The sync implementations work together:
1. `SyncStore` decodes messages and sends them to the broker immediately (no DB storage)
2. `SyncBroker` routes messages to `SyncJetStream`
3. `SyncJetStream` delivers messages directly to registered consumers

See [testing/doc.go](testing/doc.go) for detailed documentation.

## Running tests

Integration tests require a running PostgreSQL instance. You can run all tests with:
Expand Down
66 changes: 66 additions & 0 deletions testing/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Package testing provides synchronous test implementations of outbox pattern interfaces.
//
// This package enables deterministic, synchronous event processing in tests by replacing
// asynchronous components (background workers, NATS JetStream) with synchronous equivalents.
// Messages are processed immediately when created, eliminating race conditions and making
// tests more reliable and easier to debug.
//
// # Key Components
//
// SyncStore: Implements outbox.Store but immediately sends messages instead of storing
// them for async processing. Messages are decoded and sent synchronously to the broker.
//
// SyncBroker: Implements outbox.MessageBroker[*nats.Msg] and routes messages to
// SyncJetStream for immediate delivery to consumers.
//
// SyncJetStream: Implements jetstream.JetStream but delivers messages synchronously
// to registered consumers, bypassing NATS infrastructure entirely.
//
// # Usage Example
//
// // Setup synchronous testing infrastructure
// syncJS := testing.NewSyncJetStream()
// syncBroker := testing.NewSyncBroker(syncJS)
// syncStore := testing.NewSyncStore(syncBroker)
//
// // Create outbox with sync components
// outbox, err := outbox.New(syncStore, syncBroker)
// if err != nil {
// panic(err)
// }
//
// // Override DI container with sync JetStream
// do.Override(container, func(i *do.Injector) (jetstream.JetStream, error) {
// return syncJS, nil
// })
//
// // Events will now be processed synchronously and deterministically
// // No need to wait for async workers or poll for message delivery
//
// # Benefits
//
// - Deterministic execution: No async race conditions
// - Immediate processing: Events handled synchronously when created
// - Full stack traces: See the complete call chain in test failures
// - No infrastructure: Tests run without NATS or background workers
// - Simpler debugging: Step through event handling in debugger
//
// # Important Limitations
//
// Transaction Semantics: SyncStore sends messages immediately when CreateRecordTx is
// called, BEFORE the transaction commits. This differs from the real outbox pattern:
//
// Real Outbox: Store → Commit → NOTIFY → Worker → Send
// SyncStore: Send → Store (no-op) → Commit (too late)
//
// This means messages are sent even if the transaction is later rolled back, violating
// transactional guarantees. This is acceptable for most test scenarios where:
// - Tests use transaction-per-test isolation (go-txdb)
// - Focus is on business logic, not transaction edge cases
// - Rollback scenarios with events are not being tested
//
// Async Behavior: This approach tests business logic but not async behavior of the
// real system. For integration tests that verify async messaging, transaction rollback
// scenarios, or true transactional semantics, use real PostgreSQL outbox store with
// NATS JetStream via testcontainers.
package testing
32 changes: 32 additions & 0 deletions testing/sync_broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package testing

import (
"context"

"github.com/nats-io/nats.go"
"github.com/nrfta/go-outbox"
)

// SyncBroker implements outbox.MessageBroker[*nats.Msg] for synchronous testing.
// It receives messages from the outbox and immediately routes them to consumer
// handlers via SyncJetStream, enabling deterministic event processing in tests.
type SyncBroker struct {
syncJS *SyncJetStream
}

// NewSyncBroker creates a synchronous message broker for testing
func NewSyncBroker(syncJS *SyncJetStream) *SyncBroker {
return &SyncBroker{
syncJS: syncJS,
}
}

// Send implements outbox.MessageBroker[*nats.Msg]
// Instead of publishing to NATS, it immediately delivers to the sync JetStream
func (b *SyncBroker) Send(ctx context.Context, msg *nats.Msg) error {
// Use the sync JetStream to route and handle the message synchronously
return b.syncJS.SendMessage(ctx, msg.Subject, msg.Data)
}

// Ensure SyncBroker implements outbox.MessageBroker[*nats.Msg]
var _ outbox.MessageBroker[*nats.Msg] = (*SyncBroker)(nil)
Loading
Loading