Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions messaging/natsjscm/natsjscm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/sanity-io/litter"
"github.com/simiancreative/simiango/logger"
)

Expand Down Expand Up @@ -152,8 +151,6 @@ func (cm *ConnectionManager) Connect() error {
return nil
}

litter.Dump(cm.config)

// Connect to NATS
nc, err := nats.Connect(cm.config.URL, cm.config.Options...)
if err != nil {
Expand Down
47 changes: 31 additions & 16 deletions messaging/natsjsdlq/natsjsdlq.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package natsjsdlq

import (
"context"
"fmt"
"time"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/simiancreative/simiango/messaging/natsjscm"
"github.com/simiancreative/simiango/messaging/natsjspub"
)

type JetStreamContext interface {
AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error)
PublishMsg(msg *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error)
}

type Msg interface {
Metadata() (*nats.MsgMetadata, error)
}
Expand All @@ -28,20 +27,26 @@ type Config struct {
MaxDeliveries int

// Storage type for the DLQ stream
Storage nats.StorageType
Storage jetstream.StorageType

// Optional handler for DLQ errors
ErrorHandler func(error)

// Context for the DLQ handler
Context context.Context
}

type Dependencies struct {
JetStream JetStreamContext
ConnectionManager natsjscm.Connector
Publisher natsjspub.Publisher
}

// Handler manages dead letter queue operations
type Handler struct {
config Config
js JetStreamContext
cm natsjscm.Connector
p natsjspub.Publisher
ctx context.Context
}

// NewHandler creates a new DLQ handler
Expand All @@ -52,7 +57,9 @@ func NewHandler(deps Dependencies, config Config) (*Handler, error) {

handler := &Handler{
config: config,
js: deps.JetStream,
ctx: config.Context,
cm: deps.ConnectionManager,
p: deps.Publisher,
}

if err := handler.setup(); err != nil {
Expand All @@ -63,8 +70,12 @@ func NewHandler(deps Dependencies, config Config) (*Handler, error) {
}

func validateConfig(deps Dependencies, config Config) error {
if deps.JetStream == nil {
return fmt.Errorf("JetStream context is required")
if deps.ConnectionManager == nil {
return fmt.Errorf("connection manager is required")
}

if deps.Publisher == nil {
return fmt.Errorf("publisher is required")
}

if config.StreamName == "" {
Expand All @@ -80,22 +91,26 @@ func validateConfig(deps Dependencies, config Config) error {
}

if config.Storage == 0 {
config.Storage = nats.FileStorage
config.Storage = jetstream.FileStorage
}

if config.Context == nil {
config.Context = context.Background()
}

return nil
}

// setup ensures the DLQ stream exists
func (h *Handler) setup() error {
streamConfig := &nats.StreamConfig{
streamConfig := jetstream.StreamConfig{
Name: h.config.StreamName,
Subjects: []string{h.config.Subject},
Storage: h.config.Storage,
Retention: nats.WorkQueuePolicy,
Retention: jetstream.WorkQueuePolicy,
}

_, err := h.js.AddStream(streamConfig)
_, err := h.cm.EnsureStream(h.ctx, streamConfig)
if err != nil && err != nats.ErrStreamNameAlreadyInUse {
return fmt.Errorf("failed to create DLQ stream: %w", err)
}
Expand Down Expand Up @@ -126,7 +141,7 @@ func (h *Handler) PublishMessage(msg *nats.Msg, reason string) error {
dlqMsg.Data = msg.Data

// Publish to DLQ
_, err := h.js.PublishMsg(dlqMsg)
_, err := h.p.Publish(h.ctx, dlqMsg)
if err != nil && h.config.ErrorHandler != nil {
h.config.ErrorHandler(fmt.Errorf("failed to publish to DLQ: %w", err))
}
Expand Down
Loading