diff --git a/protocol/nats_jetstream/v3/go.mod b/protocol/nats_jetstream/v3/go.mod new file mode 100644 index 000000000..2b20a3062 --- /dev/null +++ b/protocol/nats_jetstream/v3/go.mod @@ -0,0 +1,27 @@ +module github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 + +go 1.18 + +replace github.com/cloudevents/sdk-go/v2 => ../../../v2 + +require ( + github.com/cloudevents/sdk-go/v2 v2.15.2 + github.com/nats-io/nats.go v1.37.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/go-cmp v0.5.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.8.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/protocol/nats_jetstream/v3/go.sum b/protocol/nats_jetstream/v3/go.sum new file mode 100644 index 000000000..a6562f7ad --- /dev/null +++ b/protocol/nats_jetstream/v3/go.sum @@ -0,0 +1,47 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/protocol/nats_jetstream/v3/message.go b/protocol/nats_jetstream/v3/message.go new file mode 100644 index 000000000..b88779a3f --- /dev/null +++ b/protocol/nats_jetstream/v3/message.go @@ -0,0 +1,145 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "bytes" + "context" + "fmt" + "strings" + + "github.com/nats-io/nats.go/jetstream" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" +) + +const ( + // see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md + prefix = "ce-" + contentTypeHeader = "content-type" +) + +var specs = spec.WithPrefix(prefix) + +// Message implements binding.Message by wrapping an jetstream.Msg. +// This message *can* be read several times safely +type Message struct { + Msg jetstream.Msg + encoding binding.Encoding +} + +// NewMessage wraps an *nats.Msg in a binding.Message. +// The returned message *can* be read several times safely +// The default encoding returned is EncodingStructured unless the NATS message contains a specversion header. +func NewMessage(msg jetstream.Msg) *Message { + encoding := binding.EncodingStructured + if msg.Headers() != nil { + if msg.Headers().Get(specs.PrefixedSpecVersionName()) != "" { + encoding = binding.EncodingBinary + } + } + return &Message{Msg: msg, encoding: encoding} +} + +var _ binding.Message = (*Message)(nil) + +// ReadEncoding return the type of the message Encoding. +func (m *Message) ReadEncoding() binding.Encoding { + return m.encoding +} + +// ReadStructured transfers a structured-mode event to a StructuredWriter. +func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { + if m.encoding != binding.EncodingStructured { + return binding.ErrNotStructured + } + return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data())) +} + +// ReadBinary transfers a binary-mode event to an BinaryWriter. +func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { + if m.encoding != binding.EncodingBinary { + return binding.ErrNotBinary + } + + version := m.GetVersion() + if version == nil { + return binding.ErrNotBinary + } + + var err error + for k, v := range m.Msg.Headers() { + headerValue := v[0] + if strings.HasPrefix(k, prefix) { + attr := version.Attribute(k) + if attr != nil { + err = encoder.SetAttribute(attr, headerValue) + } else { + err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue) + } + } else if k == contentTypeHeader { + err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue) + } + if err != nil { + return err + } + } + + if m.Msg.Data() != nil { + err = encoder.SetData(bytes.NewBuffer(m.Msg.Data())) + } + + return err +} + +// Finish *must* be called when message from a Receiver can be forgotten by the receiver. +func (m *Message) Finish(err error) error { + return nil +} + +// GetAttribute implements binding.MessageMetadataReader +func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) { + key := withPrefix(attributeKind.String()) + if m.Msg.Headers() != nil { + headerValue := m.Msg.Headers().Get(key) + if headerValue != "" { + version := m.GetVersion() + return version.Attribute(key), headerValue + } + } + return nil, nil +} + +// GetExtension implements binding.MessageMetadataReader +func (m *Message) GetExtension(name string) interface{} { + key := withPrefix(name) + if m.Msg.Headers() != nil { + headerValue := m.Msg.Headers().Get(key) + if headerValue != "" { + return headerValue + } + } + return nil +} + +// GetVersion looks for specVersion header and returns a Version object +func (m *Message) GetVersion() spec.Version { + if m.Msg.Headers() == nil { + return nil + } + versionValue := m.Msg.Headers().Get(specs.PrefixedSpecVersionName()) + if versionValue == "" { + return nil + } + return specs.Version(versionValue) +} + +// withPrefix prepends the prefix to the attribute name +func withPrefix(attributeName string) string { + return fmt.Sprintf("%s%s", prefix, attributeName) +} diff --git a/protocol/nats_jetstream/v3/message_test.go b/protocol/nats_jetstream/v3/message_test.go new file mode 100644 index 000000000..553540587 --- /dev/null +++ b/protocol/nats_jetstream/v3/message_test.go @@ -0,0 +1,111 @@ +package nats_jetstream + +import ( + "context" + "encoding/json" + "testing" + + "github.com/cloudevents/sdk-go/v2/binding/spec" + bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/test" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +type jetStreamMsg struct { + jetstream.Msg + msg *nats.Msg +} + +func (j *jetStreamMsg) Data() []byte { return j.msg.Data } +func (j *jetStreamMsg) Headers() nats.Header { return j.msg.Header } + +var ( + outBinaryMessage = bindingtest.MockBinaryMessage{ + Metadata: map[spec.Attribute]interface{}{}, + Extensions: map[string]interface{}{}, + } + outStructMessage = bindingtest.MockStructuredMessage{} + + testEvent = test.FullEvent() + binaryData, _ = json.Marshal(map[string]string{ + "ce_type": testEvent.Type(), + "ce_source": testEvent.Source(), + "ce_id": testEvent.ID(), + "ce_time": test.Timestamp.String(), + "ce_specversion": "1.0", + "ce_dataschema": test.Schema.String(), + "ce_datacontenttype": "text/json", + "ce_subject": "receiverTopic", + "ce_exta": "someext", + }) + structuredConsumerMessage = &jetStreamMsg{ + msg: &nats.Msg{ + Subject: "hello", + Data: binaryData, + }, + } + binaryConsumerMessage = &jetStreamMsg{ + msg: &nats.Msg{ + Subject: "hello", + Data: testEvent.Data(), + Header: nats.Header{ + "ce-type": {testEvent.Type()}, + "ce-source": {testEvent.Source()}, + "ce-id": {testEvent.ID()}, + "ce-time": {test.Timestamp.String()}, + "ce-specversion": {"1.0"}, + "ce-dataschema": {test.Schema.String()}, + "ce-datacontenttype": {"text/json"}, + "ce-subject": {"receiverTopic"}, + "ce-exta": {"someext"}, + }, + }, + } +) + +func TestNewMessage(t *testing.T) { + tests := []struct { + name string + consumerMessage jetstream.Msg + expectedEncoding binding.Encoding + expectedStructuredError error + expectedBinaryError error + }{ + { + name: "Structured encoding", + consumerMessage: structuredConsumerMessage, + expectedEncoding: binding.EncodingStructured, + expectedStructuredError: nil, + expectedBinaryError: binding.ErrNotBinary, + }, + { + name: "Binary encoding", + consumerMessage: binaryConsumerMessage, + expectedEncoding: binding.EncodingBinary, + expectedStructuredError: binding.ErrNotStructured, + expectedBinaryError: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := NewMessage(tt.consumerMessage) + if got == nil { + t.Errorf("Error in NewMessage!") + } + err := got.ReadBinary(context.TODO(), &outBinaryMessage) + if err != tt.expectedBinaryError { + t.Errorf("ReadBinary err:%s", err.Error()) + } + err = got.ReadStructured(context.TODO(), &outStructMessage) + if err != tt.expectedStructuredError { + t.Errorf("ReadStructured err:%s", err.Error()) + } + if got.ReadEncoding() != tt.expectedEncoding { + t.Errorf("ExpectedEncoding %s, while got %s", tt.expectedEncoding, got.ReadEncoding()) + } + }) + } +} diff --git a/protocol/nats_jetstream/v3/options.go b/protocol/nats_jetstream/v3/options.go new file mode 100644 index 000000000..a0fea6634 --- /dev/null +++ b/protocol/nats_jetstream/v3/options.go @@ -0,0 +1,92 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "errors" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +var ( + ErrNoFilterSubjects = errors.New("no filter subjects were given") + ErrMoreThanOneStream = errors.New("more than one stream for given filter subjects") + ErrNoConsumerConfig = errors.New("no consumer config was given") + ErrMoreThanOneConsumerConfig = errors.New("more than one consumer config given") +) + +// NatsOptions is a helper function to group a variadic nats.ProtocolOption into +// []nats.Option that can be used by either Sender, Consumer or Protocol +func NatsOptions(opts ...nats.Option) []nats.Option { + return opts +} + +// ProtocolOption is the function signature required to be considered an nats.ProtocolOption. +type ProtocolOption func(*Protocol) error + +// WithJetStreamOptions configures the Sender and/or Consumer +func WithJetStreamOptions(jetStreamOpts []jetstream.JetStreamOpt) ProtocolOption { + return func(p *Protocol) error { + if p.Sender != nil { + p.Sender.JetSteamOpts = jetStreamOpts + } + if p.Consumer != nil { + p.Consumer.JetSteamOpts = jetStreamOpts + } + return nil + } +} + +// WithPublishOptions configures the Sender +func WithPublishOptions(publishOpts []jetstream.PublishOpt) ProtocolOption { + return func(p *Protocol) error { + if p.Sender == nil { + return nil + } + p.Sender.PublishOpts = publishOpts + return nil + } +} + +// WithConsumerConfig configures the Consumer with the given config +func WithConsumerConfig(consumerConfig *jetstream.ConsumerConfig) ProtocolOption { + return func(p *Protocol) error { + if p.Consumer == nil { + return nil + } + if p.Consumer.OrderedConsumerConfig != nil { + return ErrMoreThanOneConsumerConfig + } + p.Consumer.ConsumerConfig = consumerConfig + return nil + } +} + +// WithOrderedConsumerConfig configures the Consumer with the given config +func WithOrderedConsumerConfig(orderedConsumerConfig *jetstream.OrderedConsumerConfig) ProtocolOption { + return func(p *Protocol) error { + if p.Consumer == nil { + return nil + } + if p.Consumer.ConsumerConfig != nil { + return ErrMoreThanOneConsumerConfig + } + p.Consumer.OrderedConsumerConfig = orderedConsumerConfig + return nil + } +} + +// WithPullConsumerOptions configures the Consumer with the given pullConsumeOpts +func WithPullConsumerOptions(pullConsumeOpt []jetstream.PullConsumeOpt) ProtocolOption { + return func(p *Protocol) error { + if p.Consumer == nil { + return nil + } + p.Consumer.PullConsumeOpt = pullConsumeOpt + return nil + } +} diff --git a/protocol/nats_jetstream/v3/options_test.go b/protocol/nats_jetstream/v3/options_test.go new file mode 100644 index 000000000..4be51dd25 --- /dev/null +++ b/protocol/nats_jetstream/v3/options_test.go @@ -0,0 +1,176 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "reflect" + "testing" + + "github.com/nats-io/nats.go/jetstream" +) + +func TestWithConsumerConfig(t *testing.T) { + filterSubjects := []string{"normal"} + type args struct { + consumer *consumer + config *jetstream.ConsumerConfig + } + type wants struct { + err error + consumer *consumer + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "valid case", + args: args{ + consumer: &consumer{}, + config: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}, + }, + wants: wants{ + err: nil, + consumer: &consumer{ + ConsumerConfig: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}, + }, + }, + }, + { + name: "too many consumer options", + args: args{ + consumer: &consumer{OrderedConsumerConfig: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}}, + config: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}, + }, + wants: wants{ + err: ErrMoreThanOneConsumerConfig, + consumer: &consumer{OrderedConsumerConfig: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.consumer.applyOptions(WithConsumerConfig(tt.args.config)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithConsumerConfig()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.consumer, tt.wants.consumer) { + t.Errorf("p = %v, want %v", tt.args.consumer, tt.wants.consumer) + } + }) + } +} + +func TestOrderedConsumerConfig(t *testing.T) { + filterSubjects := []string{"ordered"} + type args struct { + consumer *consumer + config *jetstream.OrderedConsumerConfig + } + type wants struct { + err error + consumer *consumer + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "valid case", + args: args{ + consumer: &consumer{}, + config: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}, + }, + wants: wants{ + err: nil, + consumer: &consumer{ + OrderedConsumerConfig: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}, + }, + }, + }, + { + name: "too many consumer options", + args: args{ + consumer: &consumer{ConsumerConfig: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}}, + config: &jetstream.OrderedConsumerConfig{FilterSubjects: filterSubjects}, + }, + wants: wants{ + err: ErrMoreThanOneConsumerConfig, + consumer: &consumer{ConsumerConfig: &jetstream.ConsumerConfig{FilterSubjects: filterSubjects}}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.consumer.applyOptions(WithOrderedConsumerConfig(tt.args.config)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithOrderedConsumerConfig()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.consumer, tt.wants.consumer) { + t.Errorf("p = %v, want %v", tt.args.consumer, tt.wants.consumer) + } + }) + } +} + +func TestWithPullConsumeOptions(t *testing.T) { + maxMessages := jetstream.PullMaxMessages(1) + maxBytes := jetstream.PullMaxBytes(0) + type args struct { + consumer *consumer + config []jetstream.PullConsumeOpt + } + type wants struct { + err error + consumer *consumer + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "pull consumer option given", + args: args{ + consumer: &consumer{}, + config: []jetstream.PullConsumeOpt{maxMessages, maxBytes}, + }, + wants: wants{ + err: nil, + consumer: &consumer{ + PullConsumeOpt: []jetstream.PullConsumeOpt{maxMessages, maxBytes}, + }, + }, + }, + { + name: "no pull consumer option given", + args: args{ + consumer: &consumer{}, + config: nil, + }, + wants: wants{ + err: nil, + consumer: &consumer{PullConsumeOpt: nil}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotErr := tt.args.consumer.applyOptions(WithPullConsumerOptions(tt.args.config)) + if gotErr != tt.wants.err { + t.Errorf("applyOptions(WithPullConsumerOptions()) = %v, want %v", gotErr, tt.wants.err) + } + + if !reflect.DeepEqual(tt.args.consumer, tt.wants.consumer) { + t.Errorf("p = %v, want %v", tt.args.consumer, tt.wants.consumer) + } + }) + } +} diff --git a/protocol/nats_jetstream/v3/protocol.go b/protocol/nats_jetstream/v3/protocol.go new file mode 100644 index 000000000..a0e74bbdc --- /dev/null +++ b/protocol/nats_jetstream/v3/protocol.go @@ -0,0 +1,109 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" + + "github.com/nats-io/nats.go" +) + +// Protocol is a reference implementation for using the CloudEvents binding +// integration. Protocol acts as both a NATS client and a NATS handler. +type Protocol struct { + Conn *nats.Conn + + Consumer *consumer + + Sender *sender + + connOwned bool // whether this protocol created the nats connection +} + +// NewProtocol creates a new NATS protocol. +func NewProtocol(ctx context.Context, url, sendSubject string, natsOpts []nats.Option, opts ...ProtocolOption) (*Protocol, error) { + conn, err := nats.Connect(url, natsOpts...) + if err != nil { + return nil, err + } + + p, err := NewProtocolFromConn(ctx, conn, sendSubject, opts...) + if err != nil { + conn.Close() + return nil, err + } + + p.connOwned = true + + return p, nil +} + +// NewProtocolFromConn creates a new NATS protocol with the given nats connection. +func NewProtocolFromConn(ctx context.Context, conn *nats.Conn, sendSubject string, opts ...ProtocolOption) (*Protocol, error) { + var err error + p := &Protocol{ + Conn: conn, + } + + if p.Consumer, err = NewConsumerFromConn(ctx, conn, opts...); err != nil { + return nil, err + } + + if p.Sender, err = NewSenderFromConn(ctx, conn, sendSubject, opts...); err != nil { + return nil, err + } + + return p, nil +} + +// Send implements Sender.Send +func (p *Protocol) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) error { + return p.Sender.Send(ctx, in, transformers...) +} + +// OpenInbound implements Opener.OpenInbound +func (p *Protocol) OpenInbound(ctx context.Context) error { + return p.Consumer.OpenInbound(ctx) +} + +// Receive implements Receiver.Receive +func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) { + return p.Consumer.Receive(ctx) +} + +// Close implements Closer.Close +func (p *Protocol) Close(ctx context.Context) error { + if p.connOwned { + defer p.Conn.Close() + } + + if err := p.Consumer.Close(ctx); err != nil { + return err + } + + if err := p.Sender.Close(ctx); err != nil { + return err + } + + return nil +} + +func (p *Protocol) applyOptions(opts ...ProtocolOption) error { + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil +} + +var _ protocol.Receiver = (*Protocol)(nil) +var _ protocol.Sender = (*Protocol)(nil) +var _ protocol.Opener = (*Protocol)(nil) +var _ protocol.Closer = (*Protocol)(nil) diff --git a/protocol/nats_jetstream/v3/receiver.go b/protocol/nats_jetstream/v3/receiver.go new file mode 100644 index 000000000..f1638020a --- /dev/null +++ b/protocol/nats_jetstream/v3/receiver.go @@ -0,0 +1,240 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + "io" + "sync" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +type msgErr struct { + msg binding.Message +} + +type receiver struct { + incoming chan msgErr +} + +// newReceiver creates a new protocol.Receiver responsible for receiving messages. +func newReceiver() *receiver { + return &receiver{ + incoming: make(chan msgErr), + } +} + +// MsgHandler implements nats.MsgHandler and publishes messages onto our internal incoming channel to be delivered +// via r.Receive(ctx) +func (r *receiver) MsgHandler(msg jetstream.Msg) { + r.incoming <- msgErr{msg: NewMessage(msg)} +} + +// Receive implements Receiver.Receive. +func (r *receiver) Receive(ctx context.Context) (binding.Message, error) { + select { + case msgErr, ok := <-r.incoming: + if !ok { + return nil, io.EOF + } + return msgErr.msg, nil + case <-ctx.Done(): + return nil, io.EOF + } +} + +type consumer struct { + receiver + + Conn *nats.Conn + JetSteamOpts []jetstream.JetStreamOpt + JetStream jetstream.JetStream + ConsumerConfig *jetstream.ConsumerConfig + OrderedConsumerConfig *jetstream.OrderedConsumerConfig + PullConsumeOpt []jetstream.PullConsumeOpt + JetstreamConsumer jetstream.Consumer + + subMtx sync.Mutex + internalClose chan struct{} + connOwned bool +} + +// NewConsumer consumes from filterSubjects given in ConsumerConfig or OrderedConsumerConfig +// embedded in ConsumerOption. +// See: WithConsumerConfig(...) and WithOrderedConsumerConfig(...) +func NewConsumer(ctx context.Context, url string, natsOpts []nats.Option, opts ...ProtocolOption) (*consumer, error) { + conn, err := nats.Connect(url, natsOpts...) + if err != nil { + return nil, err + } + + c, err := NewConsumerFromConn(ctx, conn, opts...) + if err != nil { + conn.Close() + return nil, err + } + + c.connOwned = true + + return c, err +} + +// NewConsumerFromConn consumes from filterSubjects given in ConsumerConfig or OrderedConsumerConfig +// embedded in ConsumerOption. See: WithConsumerConfig(...) and WithOrderedConsumerConfig(...) +func NewConsumerFromConn(ctx context.Context, conn *nats.Conn, opts ...ProtocolOption) (*consumer, error) { + c := &consumer{ + receiver: *newReceiver(), + Conn: conn, + internalClose: make(chan struct{}, 1), + } + + if err := c.applyOptions(opts...); err != nil { + return nil, err + } + + var err error + if c.JetStream, err = jetstream.New(conn, c.JetSteamOpts...); err != nil { + return nil, err + } + + return c, nil +} + +// OpenInbound implements Opener.OpenInbound +func (c *consumer) OpenInbound(ctx context.Context) error { + c.subMtx.Lock() + defer c.subMtx.Unlock() + + var consumeContext jetstream.ConsumeContext + var err error + if err = c.createConsumer(ctx); err != nil { + return err + } + if consumeContext, err = c.JetstreamConsumer.Consume(c.MsgHandler, c.PullConsumeOpt...); err != nil { + return err + } + + // Wait until external or internal context done + select { + case <-ctx.Done(): + case <-c.internalClose: + } + + // Finish to consume messages in the queue and close the subscription + if consumeContext != nil { + consumeContext.Drain() + } + return nil +} + +// Close implements Closer.Close +func (c *consumer) Close(ctx context.Context) error { + // Before closing, let's be sure OpenInbound completes + // We send a signal to close and then we lock on subMtx in order + // to wait OpenInbound to finish draining the queue + c.internalClose <- struct{}{} + c.subMtx.Lock() + defer c.subMtx.Unlock() + + if c.connOwned { + c.Conn.Close() + } + + close(c.internalClose) + + return nil +} + +func (c *consumer) applyOptions(opts ...ProtocolOption) error { + p := &Protocol{Consumer: c} + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil +} + +// createConsumer creates a consumer based on the configured consumer config +func (c *consumer) createConsumer(ctx context.Context) error { + var err error + var stream string + if stream, err = c.getStreamFromSubjects(ctx); err != nil { + return err + } + consumerTypeValue := c.getConsumerType() + switch consumerTypeValue { + case consumerType_Unknown: + return ErrNoConsumerConfig + case consumerType_Ordinary: + c.JetstreamConsumer, err = c.JetStream.CreateOrUpdateConsumer(ctx, stream, *c.ConsumerConfig) + case consumerType_Ordered: + c.JetstreamConsumer, err = c.JetStream.OrderedConsumer(ctx, stream, *c.OrderedConsumerConfig) + } + return err +} + +// getConsumerType returns consumer type based on what has been configured +func (c *consumer) getConsumerType() consumerType { + if c.ConsumerConfig != nil { + return consumerType_Ordinary + } + if c.OrderedConsumerConfig != nil { + return consumerType_Ordered + } + return consumerType_Unknown +} + +// getStreamFromSubjects finds the unique stream for the set of filter subjects +// If more than one stream is found, returns ErrMoreThanOneStream +func (c *consumer) getStreamFromSubjects(ctx context.Context) (string, error) { + var subjects []string + if c.ConsumerConfig != nil && c.ConsumerConfig.FilterSubject != "" { + subjects = []string{c.ConsumerConfig.FilterSubject} + } + if c.ConsumerConfig != nil && len(c.ConsumerConfig.FilterSubjects) > 0 { + subjects = c.ConsumerConfig.FilterSubjects + } + if c.OrderedConsumerConfig != nil && len(c.OrderedConsumerConfig.FilterSubjects) > 0 { + subjects = c.OrderedConsumerConfig.FilterSubjects + } + if len(subjects) == 0 { + return "", ErrNoFilterSubjects + } + var finalStream string + for i, subject := range subjects { + currentStream, err := c.JetStream.StreamNameBySubject(ctx, subject) + if err != nil { + return "", err + } + if i == 0 { + finalStream = currentStream + continue + } + if finalStream != currentStream { + return "", ErrMoreThanOneStream + } + } + return finalStream, nil +} + +var _ protocol.Opener = (*consumer)(nil) +var _ protocol.Receiver = (*consumer)(nil) +var _ protocol.Closer = (*consumer)(nil) + +// consumerType - consumer types that have configurations defined in jetstream package +type consumerType int + +const ( + consumerType_Unknown consumerType = iota + consumerType_Ordinary + consumerType_Ordered +) diff --git a/protocol/nats_jetstream/v3/sender.go b/protocol/nats_jetstream/v3/sender.go new file mode 100644 index 000000000..ebdd3eea6 --- /dev/null +++ b/protocol/nats_jetstream/v3/sender.go @@ -0,0 +1,122 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "bytes" + "context" + "fmt" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +type sender struct { + JetSteamOpts []jetstream.JetStreamOpt + JetStream jetstream.JetStream + PublishOpts []jetstream.PublishOpt + Conn *nats.Conn + Subject string + connOwned bool +} + +// NewSender creates a new protocol.Sender responsible for opening and closing the NATS connection +func NewSender(ctx context.Context, url, subject string, natsOpts []nats.Option, opts ...ProtocolOption) (*sender, error) { + conn, err := nats.Connect(url, natsOpts...) + if err != nil { + return nil, err + } + + s, err := NewSenderFromConn(ctx, conn, subject, opts...) + if err != nil { + conn.Close() + return nil, err + } + + s.connOwned = true + + return s, nil +} + +// NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the NATS +// connection to the caller +func NewSenderFromConn(ctx context.Context, conn *nats.Conn, subject string, opts ...ProtocolOption) (*sender, error) { + s := &sender{ + Conn: conn, + Subject: subject, + } + + if err := s.applyOptions(opts...); err != nil { + return nil, err + } + + var err error + if s.JetStream, err = jetstream.New(conn, s.JetSteamOpts...); err != nil { + return nil, err + } + + return s, nil +} + +// Close implements Sender.Sender +// Sender sends messages. +func (s *sender) Send(ctx context.Context, in binding.Message, transformers ...binding.Transformer) (err error) { + defer func() { + if err2 := in.Finish(err); err2 != nil { + if err == nil { + err = err2 + } else { + err = fmt.Errorf("failed to call in.Finish() when error already occurred: %s: %w", err2.Error(), err) + } + } + }() + + if _, err = s.JetStream.StreamNameBySubject(ctx, s.Subject); err != nil { + return err + } + + writer := new(bytes.Buffer) + header, err := WriteMsg(ctx, in, writer, transformers...) + if err != nil { + return err + } + + natsMsg := &nats.Msg{ + Subject: s.Subject, + Data: writer.Bytes(), + Header: header, + } + + _, err = s.JetStream.PublishMsg(ctx, natsMsg, s.PublishOpts...) + + return err +} + +// Close implements Closer.Close +// This method only closes the connection if the Sender opened it +func (s *sender) Close(_ context.Context) error { + if s.connOwned { + s.Conn.Close() + } + + return nil +} + +func (s *sender) applyOptions(opts ...ProtocolOption) error { + p := &Protocol{Sender: s} + for _, fn := range opts { + if err := fn(p); err != nil { + return err + } + } + return nil +} + +var _ protocol.Sender = (*sender)(nil) +var _ protocol.Closer = (*Protocol)(nil) diff --git a/protocol/nats_jetstream/v3/write_message.go b/protocol/nats_jetstream/v3/write_message.go new file mode 100644 index 000000000..2fe3c598c --- /dev/null +++ b/protocol/nats_jetstream/v3/write_message.go @@ -0,0 +1,98 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/binding/format" + "github.com/cloudevents/sdk-go/v2/binding/spec" + "github.com/nats-io/nats.go" +) + +// WriteMsg fills the provided writer with the bindings.Message m. +// Using context you can tweak the encoding processing (more details on binding.Write documentation). +// The nats.Header returned is not deep-copied. The header values should be deep-copied to an event object. +func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, transformers ...binding.Transformer) (nats.Header, error) { + structuredWriter := &natsMessageWriter{writer} + binaryWriter := &natsBinaryMessageWriter{ReaderFrom: writer} + + _, err := binding.Write( + ctx, + m, + structuredWriter, + binaryWriter, + transformers..., + ) + natsHeader := binaryWriter.header + + return natsHeader, err +} + +type natsMessageWriter struct { + io.ReaderFrom +} + +// StructuredWriter implements StructuredWriter.SetStructuredEvent +func (w *natsMessageWriter) SetStructuredEvent(_ context.Context, _ format.Format, event io.Reader) error { + if _, err := w.ReadFrom(event); err != nil { + return err + } + + return nil +} + +var _ binding.StructuredWriter = (*natsMessageWriter)(nil) // Test it conforms to the interface + +type natsBinaryMessageWriter struct { + io.ReaderFrom + header nats.Header +} + +// SetAttribute implements MessageMetadataWriter.SetAttribute +func (w *natsBinaryMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { + prefixedName := withPrefix(attribute.Name()) + convertedValue := fmt.Sprint(value) + switch attribute.Kind().String() { + case spec.Time.String(): + timeValue := value.(time.Time) + convertedValue = timeValue.Format(time.RFC3339Nano) + } + w.header.Set(prefixedName, convertedValue) + return nil +} + +// SetExtension implements MessageMetadataWriter.SetExtension +func (w *natsBinaryMessageWriter) SetExtension(name string, value interface{}) error { + prefixedName := withPrefix(name) + convertedValue := fmt.Sprint(value) + w.header.Set(prefixedName, convertedValue) + return nil +} + +// Start implements BinaryWriter.Start +func (w *natsBinaryMessageWriter) Start(ctx context.Context) error { + w.header = nats.Header{} + return nil +} + +// SetData implements BinaryWriter.SetData +func (w *natsBinaryMessageWriter) SetData(data io.Reader) error { + if _, err := w.ReadFrom(data); err != nil { + return err + } + + return nil +} + +// End implements BinaryWriter.End +func (w *natsBinaryMessageWriter) End(ctx context.Context) error { + return nil +} diff --git a/samples/nats_jetstream/v3/go.mod b/samples/nats_jetstream/v3/go.mod new file mode 100644 index 000000000..9c8e89cec --- /dev/null +++ b/samples/nats_jetstream/v3/go.mod @@ -0,0 +1,31 @@ +module github.com/cloudevents/sdk-go/samples/nats_jetstream/v3 + +go 1.18 + +require ( + github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 v3.0.0 + github.com/cloudevents/sdk-go/v2 v2.15.2 + github.com/google/uuid v1.1.1 + github.com/kelseyhightower/envconfig v1.4.0 + github.com/nats-io/nats.go v1.37.0 +) + +require ( + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + go.uber.org/atomic v1.4.0 // indirect + go.uber.org/multierr v1.1.0 // indirect + go.uber.org/zap v1.10.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect +) + +replace github.com/cloudevents/sdk-go/v2 => ../../../v2 + +replace github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 => ./../../../protocol/nats_jetstream/v3 diff --git a/samples/nats_jetstream/v3/go.sum b/samples/nats_jetstream/v3/go.sum new file mode 100644 index 000000000..986929c03 --- /dev/null +++ b/samples/nats_jetstream/v3/go.sum @@ -0,0 +1,47 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= +golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/samples/nats_jetstream/v3/receiver/main.go b/samples/nats_jetstream/v3/receiver/main.go new file mode 100644 index 000000000..2328bb407 --- /dev/null +++ b/samples/nats_jetstream/v3/receiver/main.go @@ -0,0 +1,61 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + + "github.com/nats-io/nats.go/jetstream" + + cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +func main() { + ctx := context.Background() + + natsURL := "nats://localhost:4222" + natsSubject := "sample" + + consumerOpt := cejsm.WithConsumerConfig(&jetstream.ConsumerConfig{FilterSubjects: []string{natsSubject}}) + consumer, err := cejsm.NewConsumer(ctx, natsURL, cejsm.NatsOptions(), consumerOpt) + if err != nil { + log.Fatalf("failed to create nats protocol, %s", err.Error()) + } + + defer consumer.Close(ctx) + + c, err := cloudevents.NewClient(consumer) + if err != nil { + log.Fatalf("failed to create client, %s", err.Error()) + } + + for { + if err := c.StartReceiver(ctx, receive); err != nil { + log.Printf("failed to start nats receiver, %s", err.Error()) + } + } +} + +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func receive(ctx context.Context, event cloudevents.Event) error { + fmt.Printf("Got Event Context: %+v\n", event.Context) + + data := &Example{} + if err := event.DataAs(data); err != nil { + fmt.Printf("Got Data Error: %s\n", err.Error()) + } + fmt.Printf("Got Data: %+v\n", data) + + fmt.Printf("----------------------------\n") + return nil +} diff --git a/samples/nats_jetstream/v3/sender/main.go b/samples/nats_jetstream/v3/sender/main.go new file mode 100644 index 000000000..8434435d1 --- /dev/null +++ b/samples/nats_jetstream/v3/sender/main.go @@ -0,0 +1,80 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/google/uuid" + "github.com/kelseyhightower/envconfig" + + cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3" + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +const ( + count = 10 +) + +type envConfig struct { + // NATSServer URL to connect to the nats server. + NATSServer string `envconfig:"NATS_SERVER" default:"http://localhost:4222" required:"true"` + + // Subject is the nats subject to publish cloudevents on. + Subject string `envconfig:"SUBJECT" default:"sample" required:"true"` +} + +type Example struct { + Sequence int `json:"id"` + Message string `json:"message"` +} + +func main() { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Fatalf("Failed to process env var: %s", err) + } + + natsURL := "nats://localhost:4222" + natsSubject := "sample" + + ctx := context.Background() + sender, err := cejsm.NewSender(ctx, natsURL, natsSubject, cejsm.NatsOptions(), nil) + if err != nil { + log.Fatalf("Failed to create nats protocol, %s", err.Error()) + } + + defer sender.Close(context.Background()) + + c, err := cloudevents.NewClient(sender) + if err != nil { + log.Fatalf("Failed to create client, %s", err.Error()) + } + + for _, contentType := range []string{"application/json", "application/xml"} { + for i := 0; i < count; i++ { + e := cloudevents.NewEvent() + e.SetID(uuid.New().String()) + e.SetType("com.cloudevents.sample.sent") + e.SetTime(time.Now()) + e.SetSource("https://github.com/cloudevents/sdk-go/v2/samples/sender") + _ = e.SetData(contentType, &Example{ + Sequence: i, + Message: fmt.Sprintf("Hello, %s!", contentType), + }) + + if result := c.Send(context.Background(), e); cloudevents.IsUndelivered(result) { + log.Printf("failed to send: %v", err) + } else { + log.Printf("sent: %d, accepted: %t", i, cloudevents.IsACK(result)) + } + time.Sleep(100 * time.Millisecond) + } + } +} diff --git a/test/integration/go.mod b/test/integration/go.mod index b80046249..2ffc9a039 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -14,6 +14,8 @@ replace github.com/cloudevents/sdk-go/protocol/nats/v2 => ../../protocol/nats/v2 replace github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 => ../../protocol/nats_jetstream/v2 +replace github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 => ../../protocol/nats_jetstream/v3 + replace github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 => ../../protocol/kafka_sarama/v2 replace github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 => ../../protocol/mqtt_paho/v2 @@ -29,13 +31,14 @@ require ( github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2 v2.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/nats/v2 v2.5.0 github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2 v2.0.0-00010101000000-000000000000 + github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 v3.0.0-00010101000000-000000000000 github.com/cloudevents/sdk-go/protocol/stan/v2 v2.5.0 github.com/cloudevents/sdk-go/v2 v2.15.2 github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 github.com/eclipse/paho.golang v0.12.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.3.0 - github.com/nats-io/nats.go v1.31.0 + github.com/nats-io/nats.go v1.37.0 github.com/nats-io/stan.go v0.10.4 github.com/stretchr/testify v1.8.4 go.uber.org/atomic v1.4.0 @@ -61,14 +64,14 @@ require ( github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.17.2 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 // indirect - github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -77,10 +80,11 @@ require ( go.etcd.io/bbolt v1.3.6 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 // indirect - golang.org/x/crypto v0.21.0 // indirect + golang.org/x/crypto v0.27.0 // indirect golang.org/x/net v0.23.0 // indirect - golang.org/x/sync v0.4.0 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/test/integration/go.sum b/test/integration/go.sum index 422913dd6..b0115c807 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -90,8 +90,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= @@ -117,13 +117,13 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.9.23 h1:6Wj6H6QpP9FMlpCyWUaNu2yeZ/qGj+mdRkZ1wbikExU= -github.com/nats-io/nats-streaming-server v0.24.3 h1:uZez8jBkXscua++jaDsK7DhpSAkizdetar6yWbPMRco= +github.com/nats-io/nats-streaming-server v0.24.6 h1:iIZXuPSznnYkiy0P3L0AP9zEN9Etp+tITbbX1KKeq4Q= github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= -github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= -github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= -github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw= @@ -179,8 +179,8 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -203,8 +203,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= -golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -219,8 +219,8 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= @@ -229,6 +229,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 h1:GZokNIeuVkl3aZHJchRrr13WCsols02MLUcz1U9is6M= golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/test/integration/nats_jetstream/v3/nats_test.go b/test/integration/nats_jetstream/v3/nats_test.go new file mode 100644 index 000000000..07bda817b --- /dev/null +++ b/test/integration/nats_jetstream/v3/nats_test.go @@ -0,0 +1,174 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package nats_jetstream + +import ( + "context" + "os" + "testing" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + ce_nats "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/event" + bindings "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/cloudevents/sdk-go/v2/protocol/test" + . "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + . "github.com/cloudevents/sdk-go/v2/binding/test" +) + +func TestSendReceiveStructuredAndBinary(t *testing.T) { + conn := testConn(t) + defer conn.Close() + + type args struct { + opts []ce_nats.ProtocolOption + bindingEncoding binding.Encoding + } + tests := []struct { + name string + args args + }{ + { + name: "regular consumer - structured", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithConsumerConfig(&jetstream.ConsumerConfig{}), + }, + bindingEncoding: binding.EncodingStructured, + }, + }, + { + name: "ordered consumer - structured", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithOrderedConsumerConfig(&jetstream.OrderedConsumerConfig{}), + }, + bindingEncoding: binding.EncodingStructured, + }, + }, + { + name: "regular consumer - binary", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithConsumerConfig(&jetstream.ConsumerConfig{}), + }, + bindingEncoding: binding.EncodingBinary, + }, + }, { + name: "ordered consumer - binary", + args: args{ + opts: []ce_nats.ProtocolOption{ + ce_nats.WithOrderedConsumerConfig(&jetstream.OrderedConsumerConfig{}), + }, + bindingEncoding: binding.EncodingBinary, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + cleanup, s, r := testProtocol(ctx, t, conn, tt.args.opts...) + defer cleanup() + EachEvent(t, Events(), func(t *testing.T, eventIn event.Event) { + eventIn = ConvertEventExtensionsToString(t, eventIn) + + var in binding.Message + switch tt.args.bindingEncoding { + case binding.EncodingStructured: + in = MustCreateMockStructuredMessage(t, eventIn) + case binding.EncodingBinary: + in = MustCreateMockBinaryMessage(eventIn) + } + + test.SendReceive(t, binding.WithPreferredEventEncoding(context.TODO(), tt.args.bindingEncoding), in, s, r, func(out binding.Message) { + eventOut := MustToEvent(t, context.Background(), out) + assert.Equal(t, tt.args.bindingEncoding, out.ReadEncoding()) + AssertEventEquals(t, eventIn, ConvertEventExtensionsToString(t, eventOut)) + }) + }) + }) + } +} + +func testConn(t testing.TB) *nats.Conn { + t.Helper() + // STAN connections actually connect to NATS, so the env var is named appropriately + s := os.Getenv("TEST_NATS_SERVER") + if s == "" { + s = "nats://localhost:4222" + } + + conn, err := nats.Connect(s) + if err != nil { + t.Skipf("Cannot create STAN client to NATS server [%s]: %v", s, err) + } + + return conn +} + +func testProtocol(ctx context.Context, t testing.TB, natsConn *nats.Conn, opts ...ce_nats.ProtocolOption) (func(), bindings.Sender, + bindings.Receiver) { + // STAN connections actually connect to NATS, so the env var is named appropriately + s := os.Getenv("TEST_NATS_SERVER") + if s == "" { + s = "nats://localhost:4222" + } + + stream := "test-ce-client-" + uuid.New().String() + subject := stream + ".test" + + var js jetstream.JetStream + var err error + js, err = jetstream.New(natsConn) + require.NoError(t, err) + + streamConfig := jetstream.StreamConfig{Name: stream, Subjects: []string{subject}} + _, err = js.CreateOrUpdateStream(ctx, streamConfig) + require.NoError(t, err) + + // use NewProtocol rather than individual Consumer and Sender since this gives us more coverage + p, err := ce_nats.NewProtocol(ctx, s, subject, ce_nats.NatsOptions(), opts...) + require.NoError(t, err) + + setConsumerConfig(p, subject) + + go func() { + require.NoError(t, p.OpenInbound(context.TODO())) + }() + + return func() { + err = p.Close(context.TODO()) + require.NoError(t, err) + }, p.Sender, p.Consumer +} + +func setConsumerConfig(p *ce_nats.Protocol, subject string) { + if p.Consumer.OrderedConsumerConfig != nil { + p.Consumer.OrderedConsumerConfig.FilterSubjects = []string{subject} + } else if p.Consumer.ConsumerConfig != nil { + p.Consumer.ConsumerConfig.FilterSubjects = []string{subject} + } else { + p.Consumer.ConsumerConfig = &jetstream.ConsumerConfig{ + FilterSubjects: []string{subject}, + } + } +} + +func BenchmarkSendReceive(b *testing.B) { + ctx := context.Background() + conn := testConn(b) + defer conn.Close() + c, s, r := testProtocol(ctx, b, conn) + defer c() // Cleanup + test.BenchmarkSendReceive(b, s, r) +}