diff --git a/protocol/nats_jetstream/v3/message.go b/protocol/nats_jetstream/v3/message.go index ff29c7e5..1d6040f9 100644 --- a/protocol/nats_jetstream/v3/message.go +++ b/protocol/nats_jetstream/v3/message.go @@ -17,6 +17,7 @@ import ( "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/format" "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/cloudevents/sdk-go/v2/protocol" ) const ( @@ -105,6 +106,35 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) // Finish *must* be called when message from a Receiver can be forgotten by the receiver. func (m *Message) Finish(err error) error { + // Ack and Nak first checks to see if the message has been acknowleged + // and if Ack/Nak was done, it immediately returns an error without applying any logic to the message on the server. + // Nak will only be sent if the error given is explictly a NACK error(protocol.ResultNACK). + // AckPolicy effects if an explict Ack/Nak is needed. + // AckExplicit: The default policy. Each individual message must be acknowledged. + // Recommended for most reliability and functionality. + // AckNone: No acknowledgment needed; the server assumes acknowledgment on delivery. + // AckAll: Acknowledge only the last message received in a series; all previous messages are automatically acknowledged. + // Will acknowledge all pending messages for all subscribers for Pull Consumer. + // see: github.com/nats-io/nats.go/jetstream/ConsumerConfig.AckPolicy + if m.Msg == nil { + return nil + } + if protocol.IsNACK(err) { + if err = m.Msg.Nak(); err != jetstream.ErrMsgAlreadyAckd { + return err + } + } + if protocol.IsACK(err) { + if err = m.Msg.Ack(); err != jetstream.ErrMsgAlreadyAckd { + return err + } + } + + // In the case that we receive an unknown error, the behavior of whether the message is Ack/Nak + // will be based on the consumer configuration. There are several options such as: + // AckPolicy, AckWait, MaxDeliver, MaxAckPending + // that determine how messages would be redelivered by the server. + // [consumers configuration]: https://docs.nats.io/nats-concepts/jetstream/consumers#configuration return nil } diff --git a/protocol/nats_jetstream/v3/message_test.go b/protocol/nats_jetstream/v3/message_test.go index d532075e..75af4642 100644 --- a/protocol/nats_jetstream/v3/message_test.go +++ b/protocol/nats_jetstream/v3/message_test.go @@ -9,11 +9,13 @@ import ( "bytes" "context" "encoding/json" + "errors" "reflect" "testing" "github.com/cloudevents/sdk-go/v2/binding/spec" bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" + "github.com/cloudevents/sdk-go/v2/protocol" "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/test" @@ -23,11 +25,17 @@ import ( type jetStreamMsg struct { jetstream.Msg - msg *nats.Msg + msg *nats.Msg + ackCalled bool + ackErr error + nackCalled bool + nackErr error } func (j *jetStreamMsg) Data() []byte { return j.msg.Data } func (j *jetStreamMsg) Headers() nats.Header { return j.msg.Header } +func (j *jetStreamMsg) Ack() error { j.ackCalled = true; return j.ackErr } +func (j *jetStreamMsg) Nak() error { j.nackCalled = true; return j.nackErr } var ( outBinaryMessage = bindingtest.MockBinaryMessage{ @@ -190,3 +198,116 @@ func TestGetExtension(t *testing.T) { }) } } + +func TestFinish(t *testing.T) { + type args struct { + err error + ackErr error + nakErr error + } + type wants struct { + err error + ackCalled bool + nackCalled bool + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "nil error given", + args: args{ + err: nil, + }, + wants: wants{ + err: nil, + ackCalled: true, + nackCalled: false, + }, + }, + { + name: "ACK error given", + args: args{ + err: protocol.ResultACK, + }, + wants: wants{ + err: nil, + ackCalled: true, + nackCalled: false, + }, + }, + { + name: "NACK error given", + args: args{ + err: protocol.ResultNACK, + }, + wants: wants{ + err: nil, + ackCalled: false, + nackCalled: true, + }, + }, + { + name: "unknown error given", + args: args{ + err: errors.New("unknown"), + }, + wants: wants{ + err: nil, + ackCalled: false, + nackCalled: false, + }, + }, + { + name: "jetstream.ErrMsgAlreadyAckd error returned from Ack", + args: args{ + err: protocol.ResultACK, + ackErr: jetstream.ErrMsgAlreadyAckd, + }, + wants: wants{ + err: nil, + ackCalled: true, + nackCalled: false, + }, + }, + { + name: "jetstream.ErrMsgAlreadyAckd error returned from Nak", + args: args{ + err: protocol.ResultNACK, + nakErr: jetstream.ErrMsgAlreadyAckd, + }, + wants: wants{ + err: nil, + ackCalled: false, + nackCalled: true, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + binaryReceiverMessage.ackCalled = false + binaryReceiverMessage.ackErr = tt.args.ackErr + binaryReceiverMessage.nackCalled = false + binaryReceiverMessage.nackErr = tt.args.nakErr + message := NewMessage(binaryReceiverMessage) + if message == nil { + t.Errorf("Error in NewMessage!") + } + gotErr := message.Finish(tt.args.err) + if gotErr != tt.wants.err { + t.Errorf("ExpectedErr %s, while got %s", tt.wants.err, gotErr) + } + var mockMessage *jetStreamMsg + if message != nil { + mockMessage = message.Msg.(*jetStreamMsg) + } + if mockMessage.ackCalled != tt.wants.ackCalled { + t.Errorf("ExpectedAck %t, while got %t", tt.wants.ackCalled, mockMessage.ackCalled) + } + if mockMessage.nackCalled != tt.wants.nackCalled { + t.Errorf("ExpectedNack %t, while got %t", tt.wants.nackCalled, mockMessage.nackCalled) + } + }) + } +} diff --git a/protocol/nats_jetstream/v3/options.go b/protocol/nats_jetstream/v3/options.go index dadbd69c..8ebcba59 100644 --- a/protocol/nats_jetstream/v3/options.go +++ b/protocol/nats_jetstream/v3/options.go @@ -42,7 +42,7 @@ func WithConnection(conn *nats.Conn) ProtocolOption { // WithJetStreamOptions sets jetstream options used in the protocol sender and receiver func WithJetStreamOptions(jetStreamOpts []jetstream.JetStreamOpt) ProtocolOption { return func(p *Protocol) error { - p.jetSteamOpts = jetStreamOpts + p.jetStreamOpts = jetStreamOpts return nil } } diff --git a/protocol/nats_jetstream/v3/options_test.go b/protocol/nats_jetstream/v3/options_test.go index d974f41b..1103d6a2 100644 --- a/protocol/nats_jetstream/v3/options_test.go +++ b/protocol/nats_jetstream/v3/options_test.go @@ -474,7 +474,7 @@ func TestWithJetStreamOptions(t *testing.T) { wants: wants{ err: nil, protocol: &Protocol{ - jetSteamOpts: jetStreamOpts, + jetStreamOpts: jetStreamOpts, }, }, }, diff --git a/protocol/nats_jetstream/v3/protocol.go b/protocol/nats_jetstream/v3/protocol.go index 2babfc42..e78b5000 100644 --- a/protocol/nats_jetstream/v3/protocol.go +++ b/protocol/nats_jetstream/v3/protocol.go @@ -29,8 +29,8 @@ type Protocol struct { natsOpts []nats.Option // jetstream options - jetSteamOpts []jetstream.JetStreamOpt - jetStream jetstream.JetStream + jetStreamOpts []jetstream.JetStreamOpt + jetStream jetstream.JetStream // receiver incoming chan msgErr @@ -76,7 +76,7 @@ func New(ctx context.Context, opts ...ProtocolOption) (*Protocol, error) { } } - if p.jetStream, errConnection = jetstream.New(p.conn, p.jetSteamOpts...); errConnection != nil { + if p.jetStream, errConnection = jetstream.New(p.conn, p.jetStreamOpts...); errConnection != nil { return nil, errConnection }