From 615ea2da5b22f521a36b70b6247ccd5717225c92 Mon Sep 17 00:00:00 2001 From: Josemar Luedke Date: Thu, 30 Oct 2025 10:36:01 -0700 Subject: [PATCH 1/2] feat: add synchronous testing utilities for deterministic event testing Add testing package with synchronous implementations of outbox interfaces: - SyncStore: Immediately sends messages instead of storing for async processing - SyncBroker: Routes messages to SyncJetStream for synchronous delivery - SyncJetStream: Delivers messages directly to consumers without NATS Includes comprehensive test suite (9 specs): - Interface compliance verification - Message routing and delivery tests - Wildcard subject matching - End-to-end flow validation Benefits: - Deterministic test execution (no async race conditions) - Immediate event processing (no waiting for background workers) - Full stack traces in test failures - No infrastructure dependencies (NATS, background workers) - Simpler debugging These utilities enable true end-to-end testing of event-driven logic without the complexity and non-determinism of async message processing. --- README.md | 43 +++++ testing/doc.go | 53 +++++ testing/sync_broker.go | 32 ++++ testing/sync_jetstream.go | 394 ++++++++++++++++++++++++++++++++++++++ testing/sync_store.go | 71 +++++++ testing/testing_test.go | 232 ++++++++++++++++++++++ 6 files changed, 825 insertions(+) create mode 100644 testing/doc.go create mode 100644 testing/sync_broker.go create mode 100644 testing/sync_jetstream.go create mode 100644 testing/sync_store.go create mode 100644 testing/testing_test.go diff --git a/README.md b/README.md index 6401f2b..88274b3 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,49 @@ The `outbox.New` constructor accepts several optional configuration functions: You can provide your own store or message broker by implementing the `Store` and `MessageBroker` interfaces found in [outbox.go](outbox.go). +## Testing + +The `testing` package provides synchronous implementations of outbox interfaces for deterministic, race-condition-free testing. Instead of processing messages asynchronously with background workers, these implementations deliver messages synchronously when they're created. + +### Benefits + +- **Deterministic execution** – No async race conditions or timing issues +- **Immediate processing** – Events handled synchronously, no waiting required +- **Full stack traces** – See the complete call chain in test failures +- **No infrastructure** – Tests run without NATS or background workers +- **Simpler debugging** – Step through event handling in debugger + +### Usage Example + +```go +import ( + "github.com/nats-io/nats.go/jetstream" + "github.com/nrfta/go-outbox" + "github.com/nrfta/go-outbox/testing" +) + +// Setup synchronous testing infrastructure +syncJS := testing.NewSyncJetStream() +syncBroker := testing.NewSyncBroker(syncJS) +syncStore := testing.NewSyncStore(syncBroker) + +// Create outbox with sync components +ob, err := outbox.New(syncStore, syncBroker) +if err != nil { + panic(err) +} + +// Override JetStream in your DI container +// Events will now be processed synchronously +``` + +The sync implementations work together: +1. `SyncStore` decodes messages and sends them to the broker immediately (no DB storage) +2. `SyncBroker` routes messages to `SyncJetStream` +3. `SyncJetStream` delivers messages directly to registered consumers + +See [testing/doc.go](testing/doc.go) for detailed documentation. + ## Running tests Integration tests require a running PostgreSQL instance. You can run all tests with: diff --git a/testing/doc.go b/testing/doc.go new file mode 100644 index 0000000..6fbe282 --- /dev/null +++ b/testing/doc.go @@ -0,0 +1,53 @@ +// Package testing provides synchronous test implementations of outbox pattern interfaces. +// +// This package enables deterministic, synchronous event processing in tests by replacing +// asynchronous components (background workers, NATS JetStream) with synchronous equivalents. +// Messages are processed immediately when created, eliminating race conditions and making +// tests more reliable and easier to debug. +// +// # Key Components +// +// SyncStore: Implements outbox.Store but immediately sends messages instead of storing +// them for async processing. Messages are decoded and sent synchronously to the broker. +// +// SyncBroker: Implements outbox.MessageBroker[*nats.Msg] and routes messages to +// SyncJetStream for immediate delivery to consumers. +// +// SyncJetStream: Implements jetstream.JetStream but delivers messages synchronously +// to registered consumers, bypassing NATS infrastructure entirely. +// +// # Usage Example +// +// // Setup synchronous testing infrastructure +// syncJS := testing.NewSyncJetStream() +// syncBroker := testing.NewSyncBroker(syncJS) +// syncStore := testing.NewSyncStore(syncBroker) +// +// // Create outbox with sync components +// outbox, err := outbox.New(syncStore, syncBroker) +// if err != nil { +// panic(err) +// } +// +// // Override DI container with sync JetStream +// do.Override(container, func(i *do.Injector) (jetstream.JetStream, error) { +// return syncJS, nil +// }) +// +// // Events will now be processed synchronously and deterministically +// // No need to wait for async workers or poll for message delivery +// +// # Benefits +// +// - Deterministic execution: No async race conditions +// - Immediate processing: Events handled synchronously when created +// - Full stack traces: See the complete call chain in test failures +// - No infrastructure: Tests run without NATS or background workers +// - Simpler debugging: Step through event handling in debugger +// +// # Trade-offs +// +// This approach tests the business logic but not the async behavior of the real system. +// For integration tests that verify async messaging behavior, use real NATS JetStream +// with testcontainers or similar infrastructure. +package testing diff --git a/testing/sync_broker.go b/testing/sync_broker.go new file mode 100644 index 0000000..2dc2352 --- /dev/null +++ b/testing/sync_broker.go @@ -0,0 +1,32 @@ +package testing + +import ( + "context" + + "github.com/nats-io/nats.go" + "github.com/nrfta/go-outbox" +) + +// SyncBroker implements outbox.MessageBroker[*nats.Msg] for synchronous testing. +// It receives messages from the outbox and immediately routes them to consumer +// handlers via SyncJetStream, enabling deterministic event processing in tests. +type SyncBroker struct { + syncJS *SyncJetStream +} + +// NewSyncBroker creates a synchronous message broker for testing +func NewSyncBroker(syncJS *SyncJetStream) *SyncBroker { + return &SyncBroker{ + syncJS: syncJS, + } +} + +// Send implements outbox.MessageBroker[*nats.Msg] +// Instead of publishing to NATS, it immediately delivers to the sync JetStream +func (b *SyncBroker) Send(ctx context.Context, msg *nats.Msg) error { + // Use the sync JetStream to route and handle the message synchronously + return b.syncJS.SendMessage(ctx, msg.Subject, msg.Data) +} + +// Ensure SyncBroker implements outbox.MessageBroker[*nats.Msg] +var _ outbox.MessageBroker[*nats.Msg] = (*SyncBroker)(nil) diff --git a/testing/sync_jetstream.go b/testing/sync_jetstream.go new file mode 100644 index 0000000..bf21143 --- /dev/null +++ b/testing/sync_jetstream.go @@ -0,0 +1,394 @@ +package testing + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// SyncJetStream implements jetstream.JetStream for synchronous testing. +// Unlike real NATS JetStream which processes messages asynchronously, this implementation +// delivers messages synchronously directly to consumer handlers. This enables: +// - Deterministic test execution (no race conditions from async message processing) +// - Immediate event processing (no need to wait for async delivery) +// - Simpler test debugging (stack traces show the full call chain) +type SyncJetStream struct { + consumers map[string]*SyncConsumer + mu sync.RWMutex +} + +// NewSyncJetStream creates a new synchronous JetStream for testing +func NewSyncJetStream() *SyncJetStream { + return &SyncJetStream{ + consumers: make(map[string]*SyncConsumer), + } +} + +// CreateOrUpdateConsumer implements jetstream.JetStream +func (s *SyncJetStream) CreateOrUpdateConsumer(ctx context.Context, stream string, cfg jetstream.ConsumerConfig) (jetstream.Consumer, error) { + s.mu.Lock() + defer s.mu.Unlock() + + consumer := &SyncConsumer{ + stream: stream, + subject: cfg.FilterSubject, + } + + // Store consumer using stream and subject as key + key := fmt.Sprintf("%s:%s", stream, cfg.FilterSubject) + s.consumers[key] = consumer + return consumer, nil +} + +// SendMessage delivers a message to the appropriate consumer synchronously +func (s *SyncJetStream) SendMessage(ctx context.Context, subject string, data []byte) error { + s.mu.RLock() + defer s.mu.RUnlock() + + // Find matching consumer by subject + for _, consumer := range s.consumers { + if consumer.matchesSubject(subject) { + msg := &SyncJetstreamMsg{ + subject: subject, + data: data, + } + + // Call the handler synchronously - let panics bubble up to fail tests + if consumer.handler != nil { + consumer.handler(msg) + } + return nil + } + } + + // No consumer found - this is acceptable as not all events have consumers + return nil +} + +// Unimplemented methods required by jetstream.JetStream interface +func (s *SyncJetStream) CreateStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) UpdateStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) Stream(ctx context.Context, stream string) (jetstream.Stream, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) DeleteStream(ctx context.Context, stream string) error { + return fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) StreamNameBySubject(ctx context.Context, subject string) (string, error) { + return "", fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) Consumer(ctx context.Context, stream, consumer string) (jetstream.Consumer, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) DeleteConsumer(ctx context.Context, stream, consumer string) error { + return fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) AccountInfo(ctx context.Context) (*jetstream.AccountInfo, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) Publish(ctx context.Context, subject string, data []byte, opts ...jetstream.PublishOpt) (*jetstream.PubAck, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) PublishMsg(ctx context.Context, msg *nats.Msg, opts ...jetstream.PublishOpt) (*jetstream.PubAck, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) PublishAsync(subject string, data []byte, opts ...jetstream.PublishOpt) (jetstream.PubAckFuture, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) PublishMsgAsync(msg *nats.Msg, opts ...jetstream.PublishOpt) (jetstream.PubAckFuture, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) PublishAsyncPending() int { + return 0 +} + +func (s *SyncJetStream) PublishAsyncComplete() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + +func (s *SyncJetStream) CreateOrUpdateKeyValue(ctx context.Context, cfg jetstream.KeyValueConfig) (jetstream.KeyValue, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) KeyValue(ctx context.Context, bucket string) (jetstream.KeyValue, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) DeleteKeyValue(ctx context.Context, bucket string) error { + return fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) CreateObjectStore(ctx context.Context, cfg jetstream.ObjectStoreConfig) (jetstream.ObjectStore, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) ObjectStore(ctx context.Context, bucket string) (jetstream.ObjectStore, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) DeleteObjectStore(ctx context.Context, bucket string) error { + return fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) CleanupPublisher() { + // No-op for testing +} + +func (s *SyncJetStream) Conn() *nats.Conn { + return nil +} + +func (s *SyncJetStream) Options() jetstream.JetStreamOptions { + return jetstream.JetStreamOptions{} +} + +func (s *SyncJetStream) CreateConsumer(ctx context.Context, stream string, cfg jetstream.ConsumerConfig) (jetstream.Consumer, error) { + return s.CreateOrUpdateConsumer(ctx, stream, cfg) +} + +func (s *SyncJetStream) UpdateConsumer(ctx context.Context, stream string, cfg jetstream.ConsumerConfig) (jetstream.Consumer, error) { + return s.CreateOrUpdateConsumer(ctx, stream, cfg) +} + +func (s *SyncJetStream) OrderedConsumer(ctx context.Context, stream string, cfg jetstream.OrderedConsumerConfig) (jetstream.Consumer, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) PauseConsumer(ctx context.Context, stream string, consumer string, pauseUntil time.Time) (*jetstream.ConsumerPauseResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) ResumeConsumer(ctx context.Context, stream string, consumer string) (*jetstream.ConsumerPauseResponse, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) CreateOrUpdatePushConsumer(ctx context.Context, stream string, cfg jetstream.ConsumerConfig) (jetstream.PushConsumer, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) CreatePushConsumer(ctx context.Context, stream string, cfg jetstream.ConsumerConfig) (jetstream.PushConsumer, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) UpdatePushConsumer(ctx context.Context, stream string, cfg jetstream.ConsumerConfig) (jetstream.PushConsumer, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) PushConsumer(ctx context.Context, stream string, consumer string) (jetstream.PushConsumer, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncJetStream) CreateKeyValue(ctx context.Context, cfg jetstream.KeyValueConfig) (jetstream.KeyValue, error) { + return s.CreateOrUpdateKeyValue(ctx, cfg) +} + +func (s *SyncJetStream) UpdateKeyValue(ctx context.Context, cfg jetstream.KeyValueConfig) (jetstream.KeyValue, error) { + return s.CreateOrUpdateKeyValue(ctx, cfg) +} + +func (s *SyncJetStream) KeyValueStoreNames(ctx context.Context) jetstream.KeyValueNamesLister { + return nil +} + +func (s *SyncJetStream) KeyValueStores(ctx context.Context) jetstream.KeyValueLister { + return nil +} + +func (s *SyncJetStream) CreateOrUpdateObjectStore(ctx context.Context, cfg jetstream.ObjectStoreConfig) (jetstream.ObjectStore, error) { + return s.CreateObjectStore(ctx, cfg) +} + +func (s *SyncJetStream) UpdateObjectStore(ctx context.Context, cfg jetstream.ObjectStoreConfig) (jetstream.ObjectStore, error) { + return s.CreateObjectStore(ctx, cfg) +} + +func (s *SyncJetStream) ObjectStoreNames(ctx context.Context) jetstream.ObjectStoreNamesLister { + return nil +} + +func (s *SyncJetStream) ObjectStores(ctx context.Context) jetstream.ObjectStoresLister { + return nil +} + +func (s *SyncJetStream) CreateOrUpdateStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error) { + return s.CreateStream(ctx, cfg) +} + +func (s *SyncJetStream) ListStreams(ctx context.Context, opts ...jetstream.StreamListOpt) jetstream.StreamInfoLister { + return nil +} + +func (s *SyncJetStream) StreamNames(ctx context.Context, opts ...jetstream.StreamListOpt) jetstream.StreamNameLister { + return nil +} + +// SyncConsumer implements jetstream.Consumer for testing +type SyncConsumer struct { + stream string + subject string + handler jetstream.MessageHandler +} + +// Consume implements jetstream.Consumer +func (c *SyncConsumer) Consume(handler jetstream.MessageHandler, opts ...jetstream.PullConsumeOpt) (jetstream.ConsumeContext, error) { + c.handler = handler + return &SyncConsumeContext{closed: make(chan struct{})}, nil +} + +// matchesSubject checks if a message subject matches this consumer's filter +func (c *SyncConsumer) matchesSubject(msgSubject string) bool { + // The consumer's subject filter may contain wildcards like "*" + // Convert to simple matching logic + pattern := c.subject + parts := strings.Split(pattern, ".") + msgParts := strings.Split(msgSubject, ".") + + if len(parts) != len(msgParts) { + return false + } + + for i := range parts { + if parts[i] != "*" && parts[i] != ">" && parts[i] != msgParts[i] { + return false + } + } + + return true +} + +// Unimplemented methods required by jetstream.Consumer interface +func (c *SyncConsumer) Info(ctx context.Context) (*jetstream.ConsumerInfo, error) { + return nil, fmt.Errorf("not implemented") +} + +func (c *SyncConsumer) CachedInfo() *jetstream.ConsumerInfo { + return nil +} + +func (c *SyncConsumer) Fetch(batch int, opts ...jetstream.FetchOpt) (jetstream.MessageBatch, error) { + return nil, fmt.Errorf("not implemented") +} + +func (c *SyncConsumer) FetchBytes(maxBytes int, opts ...jetstream.FetchOpt) (jetstream.MessageBatch, error) { + return nil, fmt.Errorf("not implemented") +} + +func (c *SyncConsumer) FetchNoWait(batch int) (jetstream.MessageBatch, error) { + return nil, fmt.Errorf("not implemented") +} + +func (c *SyncConsumer) Messages(opts ...jetstream.PullMessagesOpt) (jetstream.MessagesContext, error) { + return nil, fmt.Errorf("not implemented") +} + +func (c *SyncConsumer) Next(opts ...jetstream.FetchOpt) (jetstream.Msg, error) { + return nil, fmt.Errorf("not implemented") +} + +// SyncConsumeContext implements jetstream.ConsumeContext +type SyncConsumeContext struct { + closed chan struct{} + once sync.Once +} + +func (c *SyncConsumeContext) Stop() { + c.once.Do(func() { + close(c.closed) + }) +} + +func (c *SyncConsumeContext) Drain() { + c.once.Do(func() { + close(c.closed) + }) +} + +func (c *SyncConsumeContext) Closed() <-chan struct{} { + return c.closed +} + +// SyncJetstreamMsg implements jetstream.Msg for testing +type SyncJetstreamMsg struct { + subject string + data []byte + acked bool +} + +func (m *SyncJetstreamMsg) Data() []byte { + return m.data +} + +func (m *SyncJetstreamMsg) Headers() nats.Header { + return nil +} + +func (m *SyncJetstreamMsg) Subject() string { + return m.subject +} + +func (m *SyncJetstreamMsg) Reply() string { + return "" +} + +func (m *SyncJetstreamMsg) Metadata() (*jetstream.MsgMetadata, error) { + return nil, fmt.Errorf("not implemented") +} + +func (m *SyncJetstreamMsg) Ack() error { + m.acked = true + return nil +} + +func (m *SyncJetstreamMsg) DoubleAck(ctx context.Context) error { + m.acked = true + return nil +} + +func (m *SyncJetstreamMsg) Nak() error { + return nil +} + +func (m *SyncJetstreamMsg) NakWithDelay(delay time.Duration) error { + return nil +} + +func (m *SyncJetstreamMsg) InProgress() error { + return nil +} + +func (m *SyncJetstreamMsg) Term() error { + return nil +} + +func (m *SyncJetstreamMsg) TermWithReason(reason string) error { + return nil +} + +// Ensure types implement their respective interfaces +var _ jetstream.JetStream = (*SyncJetStream)(nil) +var _ jetstream.Consumer = (*SyncConsumer)(nil) +var _ jetstream.ConsumeContext = (*SyncConsumeContext)(nil) +var _ jetstream.Msg = (*SyncJetstreamMsg)(nil) diff --git a/testing/sync_store.go b/testing/sync_store.go new file mode 100644 index 0000000..e200478 --- /dev/null +++ b/testing/sync_store.go @@ -0,0 +1,71 @@ +package testing + +import ( + "bytes" + "context" + "database/sql" + "encoding/gob" + "fmt" + + "github.com/nats-io/nats.go" + "github.com/nrfta/go-outbox" + "github.com/rs/xid" +) + +// SyncStore is a store for testing that immediately sends messages +// instead of storing them for async processing. This enables synchronous +// event processing in tests without the need for background workers. +type SyncStore struct { + broker outbox.MessageBroker[*nats.Msg] +} + +// NewSyncStore creates a new synchronous outbox store for testing +func NewSyncStore(broker outbox.MessageBroker[*nats.Msg]) *SyncStore { + return &SyncStore{ + broker: broker, + } +} + +func (s *SyncStore) CreateRecordTx(ctx context.Context, tx *sql.Tx, record outbox.Record) (*outbox.Record, error) { + // Instead of storing, immediately send the message + if s.broker != nil && len(record.Message) > 0 { + // Decode the gob-encoded NATS message + var msg nats.Msg + decoder := gob.NewDecoder(bytes.NewReader(record.Message)) + if err := decoder.Decode(&msg); err != nil { + return nil, err + } + + if err := s.broker.Send(ctx, &msg); err != nil { + return nil, err + } + } + return &record, nil +} + +func (s *SyncStore) Listen() <-chan xid.ID { + // Return empty channel since we're not listening + ch := make(chan xid.ID) + close(ch) + return ch +} + +func (s *SyncStore) GetWithLock(ctx context.Context, id xid.ID) (*outbox.Record, error) { + return nil, fmt.Errorf("not implemented") +} + +func (s *SyncStore) Delete(ctx context.Context, id xid.ID) error { + return nil +} + +func (s *SyncStore) ProcessTx(ctx context.Context, fn func(outbox.Store) bool) error { + fn(s) + return nil +} + +func (s *SyncStore) Update(ctx context.Context, record *outbox.Record) error { + return nil +} + +// Ensure SyncStore implements outbox.Store +var _ outbox.Store = (*SyncStore)(nil) diff --git a/testing/testing_test.go b/testing/testing_test.go new file mode 100644 index 0000000..95da8f5 --- /dev/null +++ b/testing/testing_test.go @@ -0,0 +1,232 @@ +package testing_test + +import ( + "bytes" + "context" + "encoding/gob" + "testing" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/nrfta/go-outbox" + outboxTesting "github.com/nrfta/go-outbox/testing" +) + +func TestTestingSuite(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Testing Suite") +} + +var _ = Describe("Sync Testing Utilities", func() { + var ( + syncJS *outboxTesting.SyncJetStream + syncBroker *outboxTesting.SyncBroker + syncStore *outboxTesting.SyncStore + ctx context.Context + ) + + BeforeEach(func() { + ctx = context.Background() + syncJS = outboxTesting.NewSyncJetStream() + syncBroker = outboxTesting.NewSyncBroker(syncJS) + syncStore = outboxTesting.NewSyncStore(syncBroker) + }) + + Describe("SyncStore", func() { + It("implements outbox.Store interface", func() { + var _ outbox.Store = syncStore + }) + + It("immediately sends messages to broker when CreateRecordTx is called", func() { + subject := "test.subject" + data := []byte("test data") + messageReceived := false + + // Setup consumer to capture message + consumer, err := syncJS.CreateOrUpdateConsumer(ctx, "test-stream", jetstream.ConsumerConfig{ + FilterSubject: subject, + }) + Expect(err).ToNot(HaveOccurred()) + + // Register handler + _, err = consumer.Consume(func(msg jetstream.Msg) { + Expect(msg.Subject()).To(Equal(subject)) + Expect(msg.Data()).To(Equal(data)) + messageReceived = true + msg.Ack() + }) + Expect(err).ToNot(HaveOccurred()) + + // Create NATS message and encode it (like outbox does) + msg := &nats.Msg{ + Subject: subject, + Data: data, + } + + var buf bytes.Buffer + encoder := gob.NewEncoder(&buf) + err = encoder.Encode(msg) + Expect(err).ToNot(HaveOccurred()) + + // Create record and send via store + record := outbox.Record{ + Message: buf.Bytes(), + } + + _, err = syncStore.CreateRecordTx(ctx, nil, record) + Expect(err).ToNot(HaveOccurred()) + + // Message should be immediately received + Expect(messageReceived).To(BeTrue()) + }) + }) + + Describe("SyncBroker", func() { + It("implements outbox.MessageBroker interface", func() { + var _ outbox.MessageBroker[*nats.Msg] = syncBroker + }) + + It("routes messages to SyncJetStream", func() { + subject := "test.routing" + data := []byte("routing test") + messageReceived := false + + // Setup consumer + consumer, err := syncJS.CreateOrUpdateConsumer(ctx, "test-stream", jetstream.ConsumerConfig{ + FilterSubject: subject, + }) + Expect(err).ToNot(HaveOccurred()) + + // Setup handler + _, err = consumer.Consume(func(msg jetstream.Msg) { + Expect(msg.Subject()).To(Equal(subject)) + Expect(msg.Data()).To(Equal(data)) + messageReceived = true + msg.Ack() + }) + Expect(err).ToNot(HaveOccurred()) + + // Send message via broker + msg := &nats.Msg{ + Subject: subject, + Data: data, + } + err = syncBroker.Send(ctx, msg) + Expect(err).ToNot(HaveOccurred()) + Expect(messageReceived).To(BeTrue()) + }) + }) + + Describe("SyncJetStream", func() { + It("implements jetstream.JetStream interface", func() { + var _ jetstream.JetStream = syncJS + }) + + It("delivers messages to consumers synchronously", func() { + subject := "test.sync" + data := []byte("sync test") + messageReceived := false + + // Create consumer + consumer, err := syncJS.CreateOrUpdateConsumer(ctx, "test-stream", jetstream.ConsumerConfig{ + FilterSubject: subject, + }) + Expect(err).ToNot(HaveOccurred()) + + // Register handler + _, err = consumer.Consume(func(msg jetstream.Msg) { + Expect(msg.Subject()).To(Equal(subject)) + Expect(msg.Data()).To(Equal(data)) + messageReceived = true + msg.Ack() + }) + Expect(err).ToNot(HaveOccurred()) + + // Send message - should be delivered immediately and synchronously + err = syncJS.SendMessage(ctx, subject, data) + Expect(err).ToNot(HaveOccurred()) + + // Message should already be received (no waiting needed) + Expect(messageReceived).To(BeTrue()) + }) + + It("matches wildcard subjects correctly", func() { + wildcard := "events.*" + specificSubject := "events.created" + messageReceived := false + + // Create consumer with wildcard + consumer, err := syncJS.CreateOrUpdateConsumer(ctx, "test-stream", jetstream.ConsumerConfig{ + FilterSubject: wildcard, + }) + Expect(err).ToNot(HaveOccurred()) + + // Register handler + _, err = consumer.Consume(func(msg jetstream.Msg) { + Expect(msg.Subject()).To(Equal(specificSubject)) + messageReceived = true + msg.Ack() + }) + Expect(err).ToNot(HaveOccurred()) + + // Send message with specific subject + err = syncJS.SendMessage(ctx, specificSubject, []byte("test")) + Expect(err).ToNot(HaveOccurred()) + + Expect(messageReceived).To(BeTrue()) + }) + + It("handles no matching consumer gracefully", func() { + // Send message with no consumer registered + err := syncJS.SendMessage(ctx, "unregistered.subject", []byte("test")) + Expect(err).ToNot(HaveOccurred()) + }) + }) + + Describe("End-to-end flow", func() { + It("processes messages synchronously from store to consumer", func() { + subject := "e2e.test" + data := []byte("end to end test") + messageReceived := false + + // Setup consumer + consumer, err := syncJS.CreateOrUpdateConsumer(ctx, "e2e-stream", jetstream.ConsumerConfig{ + FilterSubject: subject, + }) + Expect(err).ToNot(HaveOccurred()) + + // Register handler + _, err = consumer.Consume(func(msg jetstream.Msg) { + Expect(msg.Subject()).To(Equal(subject)) + Expect(msg.Data()).To(Equal(data)) + messageReceived = true + msg.Ack() + }) + Expect(err).ToNot(HaveOccurred()) + + // Create and encode message + msg := &nats.Msg{ + Subject: subject, + Data: data, + } + + var buf bytes.Buffer + encoder := gob.NewEncoder(&buf) + err = encoder.Encode(msg) + Expect(err).ToNot(HaveOccurred()) + + // Send via store (simulates full outbox flow) + record := outbox.Record{ + Message: buf.Bytes(), + } + _, err = syncStore.CreateRecordTx(ctx, nil, record) + Expect(err).ToNot(HaveOccurred()) + + // Message should be immediately processed + Expect(messageReceived).To(BeTrue()) + }) + }) +}) From 36e92449a6111c8a4bb0af1b3708bda65a535f81 Mon Sep 17 00:00:00 2001 From: Josemar Luedke Date: Thu, 30 Oct 2025 10:39:20 -0700 Subject: [PATCH 2/2] docs: clarify transaction semantics limitation in SyncStor --- testing/doc.go | 21 +++++++++++++++++---- testing/sync_store.go | 17 +++++++++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/testing/doc.go b/testing/doc.go index 6fbe282..37e1f60 100644 --- a/testing/doc.go +++ b/testing/doc.go @@ -45,9 +45,22 @@ // - No infrastructure: Tests run without NATS or background workers // - Simpler debugging: Step through event handling in debugger // -// # Trade-offs +// # Important Limitations // -// This approach tests the business logic but not the async behavior of the real system. -// For integration tests that verify async messaging behavior, use real NATS JetStream -// with testcontainers or similar infrastructure. +// Transaction Semantics: SyncStore sends messages immediately when CreateRecordTx is +// called, BEFORE the transaction commits. This differs from the real outbox pattern: +// +// Real Outbox: Store → Commit → NOTIFY → Worker → Send +// SyncStore: Send → Store (no-op) → Commit (too late) +// +// This means messages are sent even if the transaction is later rolled back, violating +// transactional guarantees. This is acceptable for most test scenarios where: +// - Tests use transaction-per-test isolation (go-txdb) +// - Focus is on business logic, not transaction edge cases +// - Rollback scenarios with events are not being tested +// +// Async Behavior: This approach tests business logic but not async behavior of the +// real system. For integration tests that verify async messaging, transaction rollback +// scenarios, or true transactional semantics, use real PostgreSQL outbox store with +// NATS JetStream via testcontainers. package testing diff --git a/testing/sync_store.go b/testing/sync_store.go index e200478..a1242b6 100644 --- a/testing/sync_store.go +++ b/testing/sync_store.go @@ -15,6 +15,23 @@ import ( // SyncStore is a store for testing that immediately sends messages // instead of storing them for async processing. This enables synchronous // event processing in tests without the need for background workers. +// +// IMPORTANT: Messages are sent immediately when CreateRecordTx is called, +// BEFORE the transaction commits. This differs from the real outbox pattern +// where messages are only sent after successful transaction commit. +// +// This means: +// - If a transaction is rolled back, messages are still sent +// - Transaction isolation guarantees do not apply to message delivery +// - Tests cannot verify rollback behavior with event publishing +// +// This limitation is acceptable for most testing scenarios where: +// - Tests use transaction-per-test isolation (go-txdb) +// - Focus is on business logic, not transaction edge cases +// - Rollback scenarios with events are not being tested +// +// For tests that need true transactional semantics with events, +// use the real PostgreSQL outbox store with testcontainers. type SyncStore struct { broker outbox.MessageBroker[*nats.Msg] }