-
Notifications
You must be signed in to change notification settings - Fork 219
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add v3 version of nats jetstream protocol with integration tests and …
…samples Signed-off-by: stephen-totty-hpe <stephen.totty@hpe.com>
- Loading branch information
1 parent
682f3a9
commit 16961e8
Showing
17 changed files
with
1,543 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
module github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3 | ||
|
||
go 1.18 | ||
|
||
replace github.com/cloudevents/sdk-go/v2 => ../../../v2 | ||
|
||
require ( | ||
github.com/cloudevents/sdk-go/v2 v2.15.2 | ||
github.com/nats-io/nats.go v1.37.0 | ||
) | ||
|
||
require ( | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/google/go-cmp v0.5.0 // indirect | ||
github.com/json-iterator/go v1.1.12 // indirect | ||
github.com/klauspost/compress v1.17.9 // indirect | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | ||
github.com/modern-go/reflect2 v1.0.2 // indirect | ||
github.com/nats-io/nkeys v0.4.7 // indirect | ||
github.com/nats-io/nuid v1.0.1 // indirect | ||
github.com/pmezard/go-difflib v1.0.0 // indirect | ||
github.com/stretchr/testify v1.8.0 // indirect | ||
golang.org/x/crypto v0.27.0 // indirect | ||
golang.org/x/sys v0.25.0 // indirect | ||
golang.org/x/text v0.18.0 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= | ||
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= | ||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= | ||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= | ||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= | ||
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= | ||
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= | ||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= | ||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= | ||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= | ||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= | ||
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= | ||
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= | ||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= | ||
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= | ||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= | ||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= | ||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= | ||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | ||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= | ||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= | ||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= | ||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= | ||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= | ||
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= | ||
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= | ||
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= | ||
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= | ||
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= | ||
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= | ||
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= | ||
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= | ||
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= | ||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= | ||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | ||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | ||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= | ||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | ||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
/* | ||
Copyright 2021 The CloudEvents Authors | ||
SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package nats_jetstream | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/nats-io/nats.go/jetstream" | ||
|
||
"github.com/cloudevents/sdk-go/v2/binding" | ||
"github.com/cloudevents/sdk-go/v2/binding/format" | ||
"github.com/cloudevents/sdk-go/v2/binding/spec" | ||
) | ||
|
||
const ( | ||
// see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md | ||
prefix = "ce-" | ||
contentTypeHeader = "content-type" | ||
) | ||
|
||
var specs = spec.WithPrefix(prefix) | ||
|
||
// Message implements binding.Message by wrapping an jetstream.Msg. | ||
// This message *can* be read several times safely | ||
type Message struct { | ||
Msg jetstream.Msg | ||
encoding binding.Encoding | ||
} | ||
|
||
// NewMessage wraps an *nats.Msg in a binding.Message. | ||
// The returned message *can* be read several times safely | ||
// The default encoding returned is EncodingStructured unless the NATS message contains a specversion header. | ||
func NewMessage(msg jetstream.Msg) *Message { | ||
encoding := binding.EncodingStructured | ||
if msg.Headers() != nil { | ||
if msg.Headers().Get(specs.PrefixedSpecVersionName()) != "" { | ||
encoding = binding.EncodingBinary | ||
} | ||
} | ||
return &Message{Msg: msg, encoding: encoding} | ||
} | ||
|
||
var _ binding.Message = (*Message)(nil) | ||
|
||
// ReadEncoding return the type of the message Encoding. | ||
func (m *Message) ReadEncoding() binding.Encoding { | ||
return m.encoding | ||
} | ||
|
||
// ReadStructured transfers a structured-mode event to a StructuredWriter. | ||
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error { | ||
if m.encoding != binding.EncodingStructured { | ||
return binding.ErrNotStructured | ||
} | ||
return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data())) | ||
} | ||
|
||
// ReadBinary transfers a binary-mode event to an BinaryWriter. | ||
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error { | ||
if m.encoding != binding.EncodingBinary { | ||
return binding.ErrNotBinary | ||
} | ||
|
||
version := m.GetVersion() | ||
if version == nil { | ||
return binding.ErrNotBinary | ||
} | ||
|
||
var err error | ||
for k, v := range m.Msg.Headers() { | ||
headerValue := v[0] | ||
if strings.HasPrefix(k, prefix) { | ||
attr := version.Attribute(k) | ||
if attr != nil { | ||
err = encoder.SetAttribute(attr, headerValue) | ||
} else { | ||
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue) | ||
} | ||
} else if k == contentTypeHeader { | ||
err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue) | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
if m.Msg.Data() != nil { | ||
err = encoder.SetData(bytes.NewBuffer(m.Msg.Data())) | ||
} | ||
|
||
return err | ||
} | ||
|
||
// Finish *must* be called when message from a Receiver can be forgotten by the receiver. | ||
func (m *Message) Finish(err error) error { | ||
return nil | ||
} | ||
|
||
// GetAttribute implements binding.MessageMetadataReader | ||
func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) { | ||
key := withPrefix(attributeKind.String()) | ||
if m.Msg.Headers() != nil { | ||
headerValue := m.Msg.Headers().Get(key) | ||
if headerValue != "" { | ||
version := m.GetVersion() | ||
return version.Attribute(key), headerValue | ||
} | ||
} | ||
return nil, nil | ||
} | ||
|
||
// GetExtension implements binding.MessageMetadataReader | ||
func (m *Message) GetExtension(name string) interface{} { | ||
key := withPrefix(name) | ||
if m.Msg.Headers() != nil { | ||
headerValue := m.Msg.Headers().Get(key) | ||
if headerValue != "" { | ||
return headerValue | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// GetVersion looks for specVersion header and returns a Version object | ||
func (m *Message) GetVersion() spec.Version { | ||
if m.Msg.Headers() == nil { | ||
return nil | ||
} | ||
versionValue := m.Msg.Headers().Get(specs.PrefixedSpecVersionName()) | ||
if versionValue == "" { | ||
return nil | ||
} | ||
return specs.Version(versionValue) | ||
} | ||
|
||
// withPrefix prepends the prefix to the attribute name | ||
func withPrefix(attributeName string) string { | ||
return fmt.Sprintf("%s%s", prefix, attributeName) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
package nats_jetstream | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"testing" | ||
|
||
"github.com/cloudevents/sdk-go/v2/binding/spec" | ||
bindingtest "github.com/cloudevents/sdk-go/v2/binding/test" | ||
|
||
"github.com/cloudevents/sdk-go/v2/binding" | ||
"github.com/cloudevents/sdk-go/v2/test" | ||
"github.com/nats-io/nats.go" | ||
"github.com/nats-io/nats.go/jetstream" | ||
) | ||
|
||
type jetStreamMsg struct { | ||
jetstream.Msg | ||
msg *nats.Msg | ||
} | ||
|
||
func (j *jetStreamMsg) Data() []byte { return j.msg.Data } | ||
func (j *jetStreamMsg) Headers() nats.Header { return j.msg.Header } | ||
|
||
var ( | ||
outBinaryMessage = bindingtest.MockBinaryMessage{ | ||
Metadata: map[spec.Attribute]interface{}{}, | ||
Extensions: map[string]interface{}{}, | ||
} | ||
outStructMessage = bindingtest.MockStructuredMessage{} | ||
|
||
testEvent = test.FullEvent() | ||
binaryData, _ = json.Marshal(map[string]string{ | ||
"ce_type": testEvent.Type(), | ||
"ce_source": testEvent.Source(), | ||
"ce_id": testEvent.ID(), | ||
"ce_time": test.Timestamp.String(), | ||
"ce_specversion": "1.0", | ||
"ce_dataschema": test.Schema.String(), | ||
"ce_datacontenttype": "text/json", | ||
"ce_subject": "receiverTopic", | ||
"ce_exta": "someext", | ||
}) | ||
structuredConsumerMessage = &jetStreamMsg{ | ||
msg: &nats.Msg{ | ||
Subject: "hello", | ||
Data: binaryData, | ||
}, | ||
} | ||
binaryConsumerMessage = &jetStreamMsg{ | ||
msg: &nats.Msg{ | ||
Subject: "hello", | ||
Data: testEvent.Data(), | ||
Header: nats.Header{ | ||
"ce-type": {testEvent.Type()}, | ||
"ce-source": {testEvent.Source()}, | ||
"ce-id": {testEvent.ID()}, | ||
"ce-time": {test.Timestamp.String()}, | ||
"ce-specversion": {"1.0"}, | ||
"ce-dataschema": {test.Schema.String()}, | ||
"ce-datacontenttype": {"text/json"}, | ||
"ce-subject": {"receiverTopic"}, | ||
"ce-exta": {"someext"}, | ||
}, | ||
}, | ||
} | ||
) | ||
|
||
func TestNewMessage(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
consumerMessage jetstream.Msg | ||
expectedEncoding binding.Encoding | ||
expectedStructuredError error | ||
expectedBinaryError error | ||
}{ | ||
{ | ||
name: "Structured encoding", | ||
consumerMessage: structuredConsumerMessage, | ||
expectedEncoding: binding.EncodingStructured, | ||
expectedStructuredError: nil, | ||
expectedBinaryError: binding.ErrNotBinary, | ||
}, | ||
{ | ||
name: "Binary encoding", | ||
consumerMessage: binaryConsumerMessage, | ||
expectedEncoding: binding.EncodingBinary, | ||
expectedStructuredError: binding.ErrNotStructured, | ||
expectedBinaryError: nil, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
got := NewMessage(tt.consumerMessage) | ||
if got == nil { | ||
t.Errorf("Error in NewMessage!") | ||
} | ||
err := got.ReadBinary(context.TODO(), &outBinaryMessage) | ||
if err != tt.expectedBinaryError { | ||
t.Errorf("ReadBinary err:%s", err.Error()) | ||
} | ||
err = got.ReadStructured(context.TODO(), &outStructMessage) | ||
if err != tt.expectedStructuredError { | ||
t.Errorf("ReadStructured err:%s", err.Error()) | ||
} | ||
if got.ReadEncoding() != tt.expectedEncoding { | ||
t.Errorf("ExpectedEncoding %s, while got %s", tt.expectedEncoding, got.ReadEncoding()) | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.