diff --git a/pkg/cloudevents/transport/pubsub/codec.go b/pkg/cloudevents/transport/pubsub/codec.go index ab9249c02..951b991da 100644 --- a/pkg/cloudevents/transport/pubsub/codec.go +++ b/pkg/cloudevents/transport/pubsub/codec.go @@ -2,13 +2,10 @@ package pubsub import ( "context" - "encoding/json" "fmt" - "strings" "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" - "github.com/cloudevents/sdk-go/pkg/cloudevents/types" ) type Codec struct { @@ -18,6 +15,7 @@ type Codec struct { DefaultEncodingSelectionFn EncodingSelector v03 *CodecV03 + v1 *CodecV1 } const ( @@ -28,15 +26,16 @@ var _ transport.Codec = (*Codec)(nil) func (c *Codec) Encode(ctx context.Context, e cloudevents.Event) (transport.Message, error) { switch c.Encoding { - case Default: - fallthrough - case BinaryV03: - return c.encodeBinary(ctx, e) - case StructuredV03: + case Default, BinaryV03, StructuredV03: if c.v03 == nil { c.v03 = &CodecV03{Encoding: c.Encoding} } return c.v03.Encode(ctx, e) + case StructuredV1: + if c.v1 == nil { + c.v1 = &CodecV1{Encoding: c.Encoding} + } + return c.v1.Encode(ctx, e) default: return nil, transport.NewErrMessageEncodingUnknown("wrapper", TransportName) } @@ -44,229 +43,21 @@ func (c *Codec) Encode(ctx context.Context, e cloudevents.Event) (transport.Mess func (c *Codec) Decode(ctx context.Context, msg transport.Message) (*cloudevents.Event, error) { switch encoding := c.inspectEncoding(ctx, msg); encoding { - case BinaryV03: - event := cloudevents.New(cloudevents.CloudEventsVersionV03) - return c.decodeBinary(ctx, msg, &event) - case StructuredV03: + case BinaryV03, StructuredV03: if c.v03 == nil { c.v03 = &CodecV03{Encoding: encoding} } return c.v03.Decode(ctx, msg) + case BinaryV1, StructuredV1: + if c.v1 == nil { + c.v1 = &CodecV1{Encoding: encoding} + } + return c.v1.Decode(ctx, msg) default: return nil, fmt.Errorf("unknown encoding: %s", encoding) } } -func (c Codec) encodeBinary(ctx context.Context, e cloudevents.Event) (transport.Message, error) { - attributes, err := c.toAttributes(e) - if err != nil { - return nil, err - } - data, err := e.DataBytes() - if err != nil { - return nil, err - } - - msg := &Message{ - Attributes: attributes, - Data: data, - } - - return msg, nil -} - -func (c Codec) toAttributes(e cloudevents.Event) (map[string]string, error) { - a := make(map[string]string) - a[prefix+"specversion"] = e.SpecVersion() - a[prefix+"type"] = e.Type() - a[prefix+"source"] = e.Source() - a[prefix+"id"] = e.ID() - if !e.Time().IsZero() { - t := types.Timestamp{Time: e.Time()} // TODO: change e.Time() to return string so I don't have to do this. - a[prefix+"time"] = t.String() - } - if e.DataSchema() != "" { - a[prefix+"schemaurl"] = e.DataSchema() - } - - if e.DataContentType() != "" { - a[prefix+"datacontenttype"] = e.DataContentType() - } else { - a[prefix+"datacontenttype"] = cloudevents.ApplicationJSON - } - - if e.Subject() != "" { - a[prefix+"subject"] = e.Subject() - } - - if e.DeprecatedDataContentEncoding() != "" { - a[prefix+"datacontentencoding"] = e.DeprecatedDataContentEncoding() - } - - for k, v := range e.Extensions() { - if mapVal, ok := v.(map[string]interface{}); ok { - for subkey, subval := range mapVal { - encoded, err := json.Marshal(subval) - if err != nil { - return nil, err - } - a[prefix+k+"-"+subkey] = string(encoded) - } - continue - } - if s, ok := v.(string); ok { - a[prefix+k] = s - continue - } - encoded, err := json.Marshal(v) - if err != nil { - return nil, err - } - a[prefix+k] = string(encoded) - } - - return a, nil -} - -func (c Codec) decodeBinary(ctx context.Context, msg transport.Message, event *cloudevents.Event) (*cloudevents.Event, error) { - m, ok := msg.(*Message) - if !ok { - return nil, fmt.Errorf("failed to convert transport.Message to pubsub.Message") - } - err := c.fromAttributes(m.Attributes, event) - if err != nil { - return nil, err - } - var data interface{} - if len(m.Data) > 0 { - data = m.Data - } - event.Data = data - event.DataEncoded = true - return event, nil -} - -func (c Codec) fromAttributes(a map[string]string, event *cloudevents.Event) error { - // Normalize attributes. - for k, v := range a { - ck := strings.ToLower(k) - if k != ck { - delete(a, k) - a[ck] = v - } - } - - ec := event.Context - - if sv := a[prefix+"specversion"]; sv != "" { - if err := ec.SetSpecVersion(sv); err != nil { - return err - } - } - delete(a, prefix+"specversion") - - if id := a[prefix+"id"]; id != "" { - if err := ec.SetID(id); err != nil { - return err - } - } - delete(a, prefix+"id") - - if t := a[prefix+"type"]; t != "" { - if err := ec.SetType(t); err != nil { - return err - } - } - delete(a, prefix+"type") - - if s := a[prefix+"source"]; s != "" { - if err := ec.SetSource(s); err != nil { - return err - } - } - delete(a, prefix+"source") - - if t := a[prefix+"time"]; t != "" { - if timestamp, err := types.ParseTimestamp(t); err != nil { - return err - } else if err := ec.SetTime(timestamp.Time); err != nil { - return err - } - } - delete(a, prefix+"time") - - if s := a[prefix+"schemaurl"]; s != "" { - if err := ec.SetDataSchema(s); err != nil { - return err - } - } - delete(a, prefix+"schemaurl") - - if s := a[prefix+"subject"]; s != "" { - if err := ec.SetSubject(s); err != nil { - return err - } - } - delete(a, prefix+"subject") - - if s := a[prefix+"datacontenttype"]; s != "" { - if err := ec.SetDataContentType(s); err != nil { - return err - } - } - delete(a, prefix+"datacontenttype") - - if s := a[prefix+"datacontentencoding"]; s != "" { - if err := ec.DeprecatedSetDataContentEncoding(s); err != nil { - return err - } - } - delete(a, prefix+"datacontentencoding") - - // At this point, we have deleted all the known headers. - // Everything left is assumed to be an extension. - - extensions := make(map[string]interface{}) - for k, v := range a { - if len(k) > len(prefix) && strings.EqualFold(k[:len(prefix)], prefix) { - ak := strings.ToLower(k[len(prefix):]) - if i := strings.Index(ak, "-"); i > 0 { - // attrib-key - attrib := ak[:i] - key := ak[(i + 1):] - if xv, ok := extensions[attrib]; ok { - if m, ok := xv.(map[string]interface{}); ok { - m[key] = v - continue - } - // TODO: revisit how we want to bubble errors up. - return fmt.Errorf("failed to process map type extension") - } else { - m := make(map[string]interface{}) - m[key] = v - extensions[attrib] = m - } - } else { - // key - var tmp interface{} - if err := json.Unmarshal([]byte(v), &tmp); err == nil { - extensions[ak] = tmp - } else { - // If we can't unmarshal the data, treat it as a string. - extensions[ak] = v - } - } - } - } - event.Context = ec - if len(extensions) > 0 { - for k, v := range extensions { - event.SetExtension(k, v) - } - } - return nil -} - func (c *Codec) inspectEncoding(ctx context.Context, msg transport.Message) Encoding { if c.v03 == nil { c.v03 = &CodecV03{Encoding: c.Encoding} diff --git a/pkg/cloudevents/transport/pubsub/codec_v03.go b/pkg/cloudevents/transport/pubsub/codec_v03.go index 78f506ddf..da90cdc38 100644 --- a/pkg/cloudevents/transport/pubsub/codec_v03.go +++ b/pkg/cloudevents/transport/pubsub/codec_v03.go @@ -2,10 +2,13 @@ package pubsub import ( "context" + "encoding/json" "fmt" + "strings" "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" ) const ( @@ -22,10 +25,10 @@ var _ transport.Codec = (*CodecV03)(nil) func (v CodecV03) Encode(ctx context.Context, e cloudevents.Event) (transport.Message, error) { switch v.Encoding { - case Default: - fallthrough - case StructuredV03: + case Default, StructuredV03: return v.encodeStructured(ctx, e) + case BinaryV03: + return v.encodeBinary(ctx, e) default: return nil, fmt.Errorf("unknown encoding: %d", v.Encoding) } @@ -36,6 +39,9 @@ func (v CodecV03) Decode(ctx context.Context, msg transport.Message) (*cloudeven switch v.inspectEncoding(ctx, msg) { case StructuredV03: return v.decodeStructured(ctx, cloudevents.CloudEventsVersionV03, msg) + case BinaryV03: + event := cloudevents.New(cloudevents.CloudEventsVersionV03) + return v.decodeBinary(ctx, msg, &event) default: return nil, transport.NewErrMessageEncodingUnknown("v03", TransportName) } @@ -55,3 +61,213 @@ func (v CodecV03) inspectEncoding(ctx context.Context, msg transport.Message) En } return BinaryV03 } + +func (v CodecV03) encodeBinary(ctx context.Context, e cloudevents.Event) (transport.Message, error) { + attributes, err := v.toAttributes(e) + if err != nil { + return nil, err + } + data, err := e.DataBytes() + if err != nil { + return nil, err + } + + msg := &Message{ + Attributes: attributes, + Data: data, + } + + return msg, nil +} + +func (v CodecV03) toAttributes(e cloudevents.Event) (map[string]string, error) { + a := make(map[string]string) + a[prefix+"specversion"] = e.SpecVersion() + a[prefix+"type"] = e.Type() + a[prefix+"source"] = e.Source() + a[prefix+"id"] = e.ID() + if !e.Time().IsZero() { + t := types.Timestamp{Time: e.Time()} // TODO: change e.Time() to return string so I don't have to do this. + a[prefix+"time"] = t.String() + } + if e.DataSchema() != "" { + a[prefix+"schemaurl"] = e.DataSchema() + } + + if e.DataContentType() != "" { + a[prefix+"datacontenttype"] = e.DataContentType() + } else { + a[prefix+"datacontenttype"] = cloudevents.ApplicationJSON + } + + if e.Subject() != "" { + a[prefix+"subject"] = e.Subject() + } + + if e.DeprecatedDataContentEncoding() != "" { + a[prefix+"datacontentencoding"] = e.DeprecatedDataContentEncoding() + } + + for k, v := range e.Extensions() { + if mapVal, ok := v.(map[string]interface{}); ok { + for subkey, subval := range mapVal { + encoded, err := json.Marshal(subval) + if err != nil { + return nil, err + } + a[prefix+k+"-"+subkey] = string(encoded) + } + continue + } + if s, ok := v.(string); ok { + a[prefix+k] = s + continue + } + encoded, err := json.Marshal(v) + if err != nil { + return nil, err + } + a[prefix+k] = string(encoded) + } + + return a, nil +} + +func (v CodecV03) decodeBinary(ctx context.Context, msg transport.Message, event *cloudevents.Event) (*cloudevents.Event, error) { + m, ok := msg.(*Message) + if !ok { + return nil, fmt.Errorf("failed to convert transport.Message to pubsub.Message") + } + err := v.fromAttributes(m.Attributes, event) + if err != nil { + return nil, err + } + var data interface{} + if len(m.Data) > 0 { + data = m.Data + } + event.Data = data + event.DataEncoded = true + return event, nil +} + +func (v CodecV03) fromAttributes(a map[string]string, event *cloudevents.Event) error { + // Normalize attributes. + for k, v := range a { + ck := strings.ToLower(k) + if k != ck { + delete(a, k) + a[ck] = v + } + } + + ec := event.Context + + if sv := a[prefix+"specversion"]; sv != "" { + if err := ec.SetSpecVersion(sv); err != nil { + return err + } + } + delete(a, prefix+"specversion") + + if id := a[prefix+"id"]; id != "" { + if err := ec.SetID(id); err != nil { + return err + } + } + delete(a, prefix+"id") + + if t := a[prefix+"type"]; t != "" { + if err := ec.SetType(t); err != nil { + return err + } + } + delete(a, prefix+"type") + + if s := a[prefix+"source"]; s != "" { + if err := ec.SetSource(s); err != nil { + return err + } + } + delete(a, prefix+"source") + + if t := a[prefix+"time"]; t != "" { + if timestamp, err := types.ParseTimestamp(t); err != nil { + return err + } else if err := ec.SetTime(timestamp.Time); err != nil { + return err + } + } + delete(a, prefix+"time") + + if s := a[prefix+"schemaurl"]; s != "" { + if err := ec.SetDataSchema(s); err != nil { + return err + } + } + delete(a, prefix+"schemaurl") + + if s := a[prefix+"subject"]; s != "" { + if err := ec.SetSubject(s); err != nil { + return err + } + } + delete(a, prefix+"subject") + + if s := a[prefix+"datacontenttype"]; s != "" { + if err := ec.SetDataContentType(s); err != nil { + return err + } + } + delete(a, prefix+"datacontenttype") + + if s := a[prefix+"datacontentencoding"]; s != "" { + if err := ec.DeprecatedSetDataContentEncoding(s); err != nil { + return err + } + } + delete(a, prefix+"datacontentencoding") + + // At this point, we have deleted all the known headers. + // Everything left is assumed to be an extension. + + extensions := make(map[string]interface{}) + for k, v := range a { + if len(k) > len(prefix) && strings.EqualFold(k[:len(prefix)], prefix) { + ak := strings.ToLower(k[len(prefix):]) + if i := strings.Index(ak, "-"); i > 0 { + // attrib-key + attrib := ak[:i] + key := ak[(i + 1):] + if xv, ok := extensions[attrib]; ok { + if m, ok := xv.(map[string]interface{}); ok { + m[key] = v + continue + } + // TODO: revisit how we want to bubble errors up. + return fmt.Errorf("failed to process map type extension") + } else { + m := make(map[string]interface{}) + m[key] = v + extensions[attrib] = m + } + } else { + // key + var tmp interface{} + if err := json.Unmarshal([]byte(v), &tmp); err == nil { + extensions[ak] = tmp + } else { + // If we can't unmarshal the data, treat it as a string. + extensions[ak] = v + } + } + } + } + event.Context = ec + if len(extensions) > 0 { + for k, v := range extensions { + event.SetExtension(k, v) + } + } + return nil +} diff --git a/pkg/cloudevents/transport/pubsub/codec_v1.go b/pkg/cloudevents/transport/pubsub/codec_v1.go new file mode 100644 index 000000000..e7b3ef366 --- /dev/null +++ b/pkg/cloudevents/transport/pubsub/codec_v1.go @@ -0,0 +1,269 @@ +package pubsub + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" +) + +type CodecV1 struct { + CodecStructured + + Encoding Encoding +} + +var _ transport.Codec = (*CodecV1)(nil) + +func (v CodecV1) Encode(ctx context.Context, e cloudevents.Event) (transport.Message, error) { + switch v.Encoding { + case Default, StructuredV1: + return v.encodeStructured(ctx, e) + case BinaryV1: + return v.encodeBinary(ctx, e) + default: + return nil, fmt.Errorf("unknown encoding: %d", v.Encoding) + } +} + +func (v CodecV1) Decode(ctx context.Context, msg transport.Message) (*cloudevents.Event, error) { + // only structured is supported as of v0.3 + switch v.inspectEncoding(ctx, msg) { + case StructuredV1: + return v.decodeStructured(ctx, cloudevents.CloudEventsVersionV1, msg) + case BinaryV1: + event := cloudevents.New(cloudevents.CloudEventsVersionV1) + return v.decodeBinary(ctx, msg, &event) + default: + return nil, transport.NewErrMessageEncodingUnknown("V1", TransportName) + } +} + +func (v CodecV1) inspectEncoding(ctx context.Context, msg transport.Message) Encoding { + version := msg.CloudEventsVersion() + if version != cloudevents.CloudEventsVersionV1 { + return Unknown + } + m, ok := msg.(*Message) + if !ok { + return Unknown + } + if m.Attributes[StructuredContentType] == cloudevents.ApplicationCloudEventsJSON { + return StructuredV1 + } + return BinaryV1 +} + +func (v CodecV1) encodeBinary(ctx context.Context, e cloudevents.Event) (transport.Message, error) { + attributes, err := v.toAttributes(e) + if err != nil { + return nil, err + } + data, err := e.DataBytes() + if err != nil { + return nil, err + } + + msg := &Message{ + Attributes: attributes, + Data: data, + } + + return msg, nil +} + +func (v CodecV1) toAttributes(e cloudevents.Event) (map[string]string, error) { + a := make(map[string]string) + a[prefix+"specversion"] = e.SpecVersion() + a[prefix+"type"] = e.Type() + a[prefix+"source"] = e.Source() + a[prefix+"id"] = e.ID() + if !e.Time().IsZero() { + t := types.Timestamp{Time: e.Time()} // TODO: change e.Time() to return string so I don't have to do this. + a[prefix+"time"] = t.String() + } + if e.DataSchema() != "" { + a[prefix+"dataschema"] = e.DataSchema() + } + + if e.DataContentType() != "" { + a[prefix+"datacontenttype"] = e.DataContentType() + } else { + a[prefix+"datacontenttype"] = cloudevents.ApplicationJSON + } + + if e.Subject() != "" { + a[prefix+"subject"] = e.Subject() + } + + if e.DeprecatedDataContentEncoding() != "" { + a[prefix+"datacontentencoding"] = e.DeprecatedDataContentEncoding() + } + + for k, v := range e.Extensions() { + if mapVal, ok := v.(map[string]interface{}); ok { + for subkey, subval := range mapVal { + encoded, err := json.Marshal(subval) + if err != nil { + return nil, err + } + a[prefix+k+"-"+subkey] = string(encoded) + } + continue + } + if s, ok := v.(string); ok { + a[prefix+k] = s + continue + } + encoded, err := json.Marshal(v) + if err != nil { + return nil, err + } + a[prefix+k] = string(encoded) + } + + return a, nil +} + +func (v CodecV1) decodeBinary(ctx context.Context, msg transport.Message, event *cloudevents.Event) (*cloudevents.Event, error) { + m, ok := msg.(*Message) + if !ok { + return nil, fmt.Errorf("failed to convert transport.Message to pubsub.Message") + } + err := v.fromAttributes(m.Attributes, event) + if err != nil { + return nil, err + } + var data interface{} + if len(m.Data) > 0 { + data = m.Data + } + event.Data = data + event.DataEncoded = true + return event, nil +} + +func (v CodecV1) fromAttributes(a map[string]string, event *cloudevents.Event) error { + // Normalize attributes. + for k, v := range a { + ck := strings.ToLower(k) + if k != ck { + delete(a, k) + a[ck] = v + } + } + + ec := event.Context + + if sv := a[prefix+"specversion"]; sv != "" { + if err := ec.SetSpecVersion(sv); err != nil { + return err + } + } + delete(a, prefix+"specversion") + + if id := a[prefix+"id"]; id != "" { + if err := ec.SetID(id); err != nil { + return err + } + } + delete(a, prefix+"id") + + if t := a[prefix+"type"]; t != "" { + if err := ec.SetType(t); err != nil { + return err + } + } + delete(a, prefix+"type") + + if s := a[prefix+"source"]; s != "" { + if err := ec.SetSource(s); err != nil { + return err + } + } + delete(a, prefix+"source") + + if t := a[prefix+"time"]; t != "" { + if timestamp, err := types.ParseTimestamp(t); err != nil { + return err + } else if err := ec.SetTime(timestamp.Time); err != nil { + return err + } + } + delete(a, prefix+"time") + + if s := a[prefix+"dataschema"]; s != "" { + if err := ec.SetDataSchema(s); err != nil { + return err + } + } + delete(a, prefix+"dataschema") + + if s := a[prefix+"subject"]; s != "" { + if err := ec.SetSubject(s); err != nil { + return err + } + } + delete(a, prefix+"subject") + + if s := a[prefix+"datacontenttype"]; s != "" { + if err := ec.SetDataContentType(s); err != nil { + return err + } + } + delete(a, prefix+"datacontenttype") + + if s := a[prefix+"datacontentencoding"]; s != "" { + if err := ec.DeprecatedSetDataContentEncoding(s); err != nil { + return err + } + } + delete(a, prefix+"datacontentencoding") + + // At this point, we have deleted all the known headers. + // Everything left is assumed to be an extension. + + extensions := make(map[string]interface{}) + for k, v := range a { + if len(k) > len(prefix) && strings.EqualFold(k[:len(prefix)], prefix) { + ak := strings.ToLower(k[len(prefix):]) + if i := strings.Index(ak, "-"); i > 0 { + // attrib-key + attrib := ak[:i] + key := ak[(i + 1):] + if xv, ok := extensions[attrib]; ok { + if m, ok := xv.(map[string]interface{}); ok { + m[key] = v + continue + } + // TODO: revisit how we want to bubble errors up. + return fmt.Errorf("failed to process map type extension") + } else { + m := make(map[string]interface{}) + m[key] = v + extensions[attrib] = m + } + } else { + // key + var tmp interface{} + if err := json.Unmarshal([]byte(v), &tmp); err == nil { + extensions[ak] = tmp + } else { + // If we can't unmarshal the data, treat it as a string. + extensions[ak] = v + } + } + } + } + event.Context = ec + if len(extensions) > 0 { + for k, v := range extensions { + event.SetExtension(k, v) + } + } + return nil +} diff --git a/pkg/cloudevents/transport/pubsub/codec_v1_test.go b/pkg/cloudevents/transport/pubsub/codec_v1_test.go new file mode 100644 index 000000000..8b75d2547 --- /dev/null +++ b/pkg/cloudevents/transport/pubsub/codec_v1_test.go @@ -0,0 +1,282 @@ +package pubsub_test + +import ( + "context" + "net/url" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" +) + +func TestCodecV1_Encode(t *testing.T) { + now := types.Timestamp{Time: time.Now().UTC()} + sourceUrl, _ := url.Parse("http://example.com/source") + source := &types.URIRef{URL: *sourceUrl} + + schemaUrl, _ := url.Parse("http://example.com/schema") + schema := &types.URI{URL: *schemaUrl} + + testCases := map[string]struct { + codec pubsub.CodecV1 + event cloudevents.Event + want *pubsub.Message + wantErr error + }{ + "simple v1.0 default": { + codec: pubsub.CodecV1{}, + event: cloudevents.Event{ + Context: cloudevents.EventContextV1{ + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }.AsV1(), + }, + want: &pubsub.Message{ + Attributes: map[string]string{ + "Content-Type": cloudevents.ApplicationCloudEventsJSON, + }, + Data: func() []byte { + body := map[string]interface{}{ + "specversion": "1.0", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + "full v1.0 default": { + codec: pubsub.CodecV1{}, + event: cloudevents.Event{ + Context: cloudevents.EventContextV1{ + ID: "ABC-123", + Time: &now, + Type: "com.example.test", + DataSchema: schema, + DataContentType: cloudevents.StringOfApplicationJSON(), + Source: *source, + Extensions: map[string]interface{}{ + "test": "extended", + }, + }.AsV1(), + Data: map[string]interface{}{ + "hello": "world", + }, + }, + want: &pubsub.Message{ + Attributes: map[string]string{ + "Content-Type": cloudevents.ApplicationCloudEventsJSON, + }, + Data: func() []byte { + body := map[string]interface{}{ + "specversion": "1.0", + "datacontenttype": "application/json", + "data": map[string]interface{}{ + "hello": "world", + }, + "id": "ABC-123", + "time": now, + "type": "com.example.test", + "test": "extended", + "dataschema": "http://example.com/schema", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + "simple v1.0 structured": { + codec: pubsub.CodecV1{Encoding: pubsub.StructuredV1}, + event: cloudevents.Event{ + Context: cloudevents.EventContextV1{ + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }.AsV1(), + }, + want: &pubsub.Message{ + Attributes: map[string]string{ + "Content-Type": cloudevents.ApplicationCloudEventsJSON, + }, + Data: func() []byte { + body := map[string]interface{}{ + "specversion": "1.0", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + "full v1.0 structured": { + codec: pubsub.CodecV1{Encoding: pubsub.StructuredV1}, + event: cloudevents.Event{ + Context: cloudevents.EventContextV1{ + ID: "ABC-123", + Time: &now, + Type: "com.example.test", + DataSchema: schema, + DataContentType: cloudevents.StringOfApplicationJSON(), + Source: *source, + Extensions: map[string]interface{}{ + "test": "extended", + }, + }.AsV1(), + Data: map[string]interface{}{ + "hello": "world", + }, + }, + want: &pubsub.Message{ + Attributes: map[string]string{ + "Content-Type": cloudevents.ApplicationCloudEventsJSON, + }, + Data: func() []byte { + body := map[string]interface{}{ + "specversion": "1.0", + "datacontenttype": "application/json", + "data": map[string]interface{}{ + "hello": "world", + }, + "id": "ABC-123", + "time": now, + "type": "com.example.test", + "test": "extended", + "dataschema": "http://example.com/schema", + "source": "http://example.com/source", + } + return toBytes(body) + }(), + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + got, err := tc.codec.Encode(context.TODO(), tc.event) + + if tc.wantErr != nil || err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + + if msg, ok := got.(*pubsub.Message); ok { + // It is hard to read the byte dump + want := string(tc.want.Data) + got := string(msg.Data) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected message body (-want, +got) = %v", diff) + return + } + } + + t.Errorf("unexpected message (-want, +got) = %v", diff) + } + }) + } +} + +func TestCodecV1_Decode(t *testing.T) { + now := types.Timestamp{Time: time.Now()} + sourceUrl, _ := url.Parse("http://example.com/source") + source := &types.URIRef{URL: *sourceUrl} + + schemaUrl, _ := url.Parse("http://example.com/schema") + schema := &types.URI{URL: *schemaUrl} + + testCases := map[string]struct { + codec pubsub.CodecV1 + msg *pubsub.Message + want *cloudevents.Event + wantErr error + }{ + "simple v1.0 structured": { + codec: pubsub.CodecV1{}, + msg: &pubsub.Message{ + Attributes: map[string]string{ + "Content-Type": cloudevents.ApplicationCloudEventsJSON, + }, + Data: toBytes(map[string]interface{}{ + "specversion": "1.0", + "id": "ABC-123", + "type": "com.example.test", + "source": "http://example.com/source", + }), + }, + want: &cloudevents.Event{ + Context: &cloudevents.EventContextV1{ + SpecVersion: cloudevents.CloudEventsVersionV1, + Type: "com.example.test", + Source: *source, + ID: "ABC-123", + }, + }, + }, + "full v1.0 structured": { + codec: pubsub.CodecV1{}, + msg: &pubsub.Message{ + Attributes: map[string]string{ + "Content-Type": cloudevents.ApplicationCloudEventsJSON, + }, + Data: toBytes(map[string]interface{}{ + "specversion": "1.0", + "datacontenttype": "application/json", + "data": map[string]interface{}{ + "hello": "world", + }, + "id": "ABC-123", + "time": now, + "type": "com.example.test", + "test": "extended", + "dataschema": "http://example.com/schema", + "source": "http://example.com/source", + }), + }, + want: &cloudevents.Event{ + Context: &cloudevents.EventContextV1{ + SpecVersion: cloudevents.CloudEventsVersionV1, + ID: "ABC-123", + Time: &now, + Type: "com.example.test", + DataSchema: schema, + DataContentType: cloudevents.StringOfApplicationJSON(), + Source: *source, + Extensions: map[string]interface{}{ + "test": "extended", + }, + }, + Data: toBytes(map[string]interface{}{ + "hello": "world", + }), + DataEncoded: true, + }, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + got, err := tc.codec.Decode(context.TODO(), tc.msg) + + if tc.wantErr != nil || err != nil { + if diff := cmp.Diff(tc.wantErr, err); diff != "" { + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("unexpected event (-want, +got) = %v", diff) + } + }) + } +} diff --git a/pkg/cloudevents/transport/pubsub/encoding.go b/pkg/cloudevents/transport/pubsub/encoding.go index 7f6ef141c..2bdf56496 100644 --- a/pkg/cloudevents/transport/pubsub/encoding.go +++ b/pkg/cloudevents/transport/pubsub/encoding.go @@ -16,8 +16,12 @@ const ( Default Encoding = iota // BinaryV03 is Binary CloudEvents spec v0.3. BinaryV03 + // BinaryV1 is Binary CloudEvents spec v1.0. + BinaryV1 // StructuredV03 is Structured CloudEvents spec v0.3. StructuredV03 + // StructuredV1 is Structured CloudEvents spec v1.0. + StructuredV1 // Unknown is unknown. Unknown ) @@ -28,6 +32,8 @@ func DefaultBinaryEncodingSelectionStrategy(ctx context.Context, e cloudevents.E switch e.SpecVersion() { case cloudevents.CloudEventsVersionV01, cloudevents.CloudEventsVersionV02, cloudevents.CloudEventsVersionV03: return BinaryV03 + case cloudevents.CloudEventsVersionV1: + return BinaryV1 } // Unknown version, return Default. return Default @@ -39,6 +45,8 @@ func DefaultStructuredEncodingSelectionStrategy(ctx context.Context, e cloudeven switch e.SpecVersion() { case cloudevents.CloudEventsVersionV01, cloudevents.CloudEventsVersionV02, cloudevents.CloudEventsVersionV03: return StructuredV03 + case cloudevents.CloudEventsVersionV1: + return StructuredV1 } // Unknown version, return Default. return Default @@ -51,11 +59,11 @@ func (e Encoding) String() string { return "Default Encoding " + e.Version() // Binary - case BinaryV03: + case BinaryV03, BinaryV1: return "Binary Encoding " + e.Version() // Structured - case StructuredV03: + case StructuredV03, StructuredV1: return "Structured Encoding " + e.Version() default: @@ -68,13 +76,14 @@ func (e Encoding) Version() string { switch e { // Version 0.2 - case Default: // <-- Move when a new default is wanted. - fallthrough - // Version 0.3 - case StructuredV03: + case Default, BinaryV03, StructuredV03: return "v0.3" + // Version 1.0 + case BinaryV1, StructuredV1: + return "v1.0" + // Unknown default: return "Unknown"