From dc21c3afc9844d3d49edee5209fe6c9bdb1f377e Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Tue, 14 Jan 2025 11:49:28 +0100 Subject: [PATCH] added forward compatibility for old marshaler --- components/cqrs/marshaler_protobuf.go | 21 +- .../marshaler_protobuf_events_new_test.go | 483 ++++++++++++++++++ .../cqrs/marshaler_protobuf_events_test.go | 192 +++---- components/cqrs/marshaler_protobuf_gogo.go | 127 +++++ .../cqrs/marshaler_protobuf_gogo_test.go | 142 +++++ components/cqrs/marshaler_protobuf_test.go | 102 +++- components/cqrs/testdata/events.proto | 30 +- go.mod | 2 + go.sum | 35 ++ 9 files changed, 975 insertions(+), 159 deletions(-) create mode 100644 components/cqrs/marshaler_protobuf_events_new_test.go create mode 100644 components/cqrs/marshaler_protobuf_gogo.go create mode 100644 components/cqrs/marshaler_protobuf_gogo_test.go diff --git a/components/cqrs/marshaler_protobuf.go b/components/cqrs/marshaler_protobuf.go index addcf31c9..8e9f66afb 100644 --- a/components/cqrs/marshaler_protobuf.go +++ b/components/cqrs/marshaler_protobuf.go @@ -10,8 +10,8 @@ import ( "github.com/pkg/errors" ) -// ProtobufMarshaler is the default Protocol Buffers marshaler. -type ProtobufMarshaler struct { +// ProtoMarshaler is the default Protocol Buffers marshaler. +type ProtoMarshaler struct { NewUUID func() string GenerateName func(v interface{}) string } @@ -31,7 +31,7 @@ func (e NoProtoMessageError) Error() string { } // Marshal marshals the given protobuf's message into watermill's Message. -func (m ProtobufMarshaler) Marshal(v interface{}) (*message.Message, error) { +func (m ProtoMarshaler) Marshal(v interface{}) (*message.Message, error) { protoMsg, ok := v.(proto.Message) if !ok { return nil, errors.WithStack(NoProtoMessageError{v}) @@ -51,7 +51,7 @@ func (m ProtobufMarshaler) Marshal(v interface{}) (*message.Message, error) { return msg, nil } -func (m ProtobufMarshaler) newUUID() string { +func (m ProtoMarshaler) newUUID() string { if m.NewUUID != nil { return m.NewUUID() } @@ -61,12 +61,17 @@ func (m ProtobufMarshaler) newUUID() string { } // Unmarshal unmarshals given watermill's Message into protobuf's message. -func (ProtobufMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error) { - return proto.Unmarshal(msg.Payload, v.(proto.Message)) +func (ProtoMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error) { + protoV, ok := v.(proto.Message) + if !ok { + return errors.WithStack(NoProtoMessageError{v}) + } + + return proto.Unmarshal(msg.Payload, protoV) } // Name returns the command or event's name. -func (m ProtobufMarshaler) Name(cmdOrEvent interface{}) string { +func (m ProtoMarshaler) Name(cmdOrEvent interface{}) string { if m.GenerateName != nil { return m.GenerateName(cmdOrEvent) } @@ -75,6 +80,6 @@ func (m ProtobufMarshaler) Name(cmdOrEvent interface{}) string { } // NameFromMessage returns the metadata name value for a given Message. -func (m ProtobufMarshaler) NameFromMessage(msg *message.Message) string { +func (m ProtoMarshaler) NameFromMessage(msg *message.Message) string { return msg.Metadata.Get("name") } diff --git a/components/cqrs/marshaler_protobuf_events_new_test.go b/components/cqrs/marshaler_protobuf_events_new_test.go new file mode 100644 index 000000000..c7fb8e26c --- /dev/null +++ b/components/cqrs/marshaler_protobuf_events_new_test.go @@ -0,0 +1,483 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.24.4 +// source: events.proto + +package cqrs_test + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Status int32 + +const ( + Status_STATUS_UNSPECIFIED Status = 0 + Status_ACTIVE Status = 1 + Status_DELETED Status = 2 +) + +// Enum value maps for Status. +var ( + Status_name = map[int32]string{ + 0: "STATUS_UNSPECIFIED", + 1: "ACTIVE", + 2: "DELETED", + } + Status_value = map[string]int32{ + "STATUS_UNSPECIFIED": 0, + "ACTIVE": 1, + "DELETED": 2, + } +) + +func (x Status) Enum() *Status { + p := new(Status) + *p = x + return p +} + +func (x Status) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Status) Descriptor() protoreflect.EnumDescriptor { + return file_events_proto_enumTypes[0].Descriptor() +} + +func (Status) Type() protoreflect.EnumType { + return &file_events_proto_enumTypes[0] +} + +func (x Status) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Status.Descriptor instead. +func (Status) EnumDescriptor() ([]byte, []int) { + return file_events_proto_rawDescGZIP(), []int{0} +} + +type TestProtobufLegacyEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + When *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=when,proto3" json:"when,omitempty"` +} + +func (x *TestProtobufLegacyEvent) Reset() { + *x = TestProtobufLegacyEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_events_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TestProtobufLegacyEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TestProtobufLegacyEvent) ProtoMessage() {} + +func (x *TestProtobufLegacyEvent) ProtoReflect() protoreflect.Message { + mi := &file_events_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TestProtobufLegacyEvent.ProtoReflect.Descriptor instead. +func (*TestProtobufLegacyEvent) Descriptor() ([]byte, []int) { + return file_events_proto_rawDescGZIP(), []int{0} +} + +func (x *TestProtobufLegacyEvent) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *TestProtobufLegacyEvent) GetWhen() *timestamppb.Timestamp { + if x != nil { + return x.When + } + return nil +} + +type SubEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Tags []string `protobuf:"bytes,1,rep,name=tags,proto3" json:"tags,omitempty"` + Flags map[string]bool `protobuf:"bytes,2,rep,name=flags,proto3" json:"flags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` +} + +func (x *SubEvent) Reset() { + *x = SubEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_events_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubEvent) ProtoMessage() {} + +func (x *SubEvent) ProtoReflect() protoreflect.Message { + mi := &file_events_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubEvent.ProtoReflect.Descriptor instead. +func (*SubEvent) Descriptor() ([]byte, []int) { + return file_events_proto_rawDescGZIP(), []int{1} +} + +func (x *SubEvent) GetTags() []string { + if x != nil { + return x.Tags + } + return nil +} + +func (x *SubEvent) GetFlags() map[string]bool { + if x != nil { + return x.Flags + } + return nil +} + +type TestComplexProtobufEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + When *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=when,proto3" json:"when,omitempty"` + // Complex fields to test edge cases + NestedMap map[string]*SubEvent `protobuf:"bytes,4,rep,name=nested_map,json=nestedMap,proto3" json:"nested_map,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Events []*SubEvent `protobuf:"bytes,5,rep,name=events,proto3" json:"events,omitempty"` + // Types that are assignable to Result: + // + // *TestComplexProtobufEvent_Success + // *TestComplexProtobufEvent_Error + // *TestComplexProtobufEvent_Fallback + Result isTestComplexProtobufEvent_Result `protobuf_oneof:"result"` +} + +func (x *TestComplexProtobufEvent) Reset() { + *x = TestComplexProtobufEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_events_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TestComplexProtobufEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TestComplexProtobufEvent) ProtoMessage() {} + +func (x *TestComplexProtobufEvent) ProtoReflect() protoreflect.Message { + mi := &file_events_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TestComplexProtobufEvent.ProtoReflect.Descriptor instead. +func (*TestComplexProtobufEvent) Descriptor() ([]byte, []int) { + return file_events_proto_rawDescGZIP(), []int{2} +} + +func (x *TestComplexProtobufEvent) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *TestComplexProtobufEvent) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *TestComplexProtobufEvent) GetWhen() *timestamppb.Timestamp { + if x != nil { + return x.When + } + return nil +} + +func (x *TestComplexProtobufEvent) GetNestedMap() map[string]*SubEvent { + if x != nil { + return x.NestedMap + } + return nil +} + +func (x *TestComplexProtobufEvent) GetEvents() []*SubEvent { + if x != nil { + return x.Events + } + return nil +} + +func (m *TestComplexProtobufEvent) GetResult() isTestComplexProtobufEvent_Result { + if m != nil { + return m.Result + } + return nil +} + +func (x *TestComplexProtobufEvent) GetSuccess() *SubEvent { + if x, ok := x.GetResult().(*TestComplexProtobufEvent_Success); ok { + return x.Success + } + return nil +} + +func (x *TestComplexProtobufEvent) GetError() string { + if x, ok := x.GetResult().(*TestComplexProtobufEvent_Error); ok { + return x.Error + } + return "" +} + +func (x *TestComplexProtobufEvent) GetFallback() Status { + if x, ok := x.GetResult().(*TestComplexProtobufEvent_Fallback); ok { + return x.Fallback + } + return Status_STATUS_UNSPECIFIED +} + +type isTestComplexProtobufEvent_Result interface { + isTestComplexProtobufEvent_Result() +} + +type TestComplexProtobufEvent_Success struct { + Success *SubEvent `protobuf:"bytes,6,opt,name=success,proto3,oneof"` +} + +type TestComplexProtobufEvent_Error struct { + Error string `protobuf:"bytes,7,opt,name=error,proto3,oneof"` +} + +type TestComplexProtobufEvent_Fallback struct { + Fallback Status `protobuf:"varint,8,opt,name=fallback,proto3,enum=cqrs_test.Status,oneof"` +} + +func (*TestComplexProtobufEvent_Success) isTestComplexProtobufEvent_Result() {} + +func (*TestComplexProtobufEvent_Error) isTestComplexProtobufEvent_Result() {} + +func (*TestComplexProtobufEvent_Fallback) isTestComplexProtobufEvent_Result() {} + +var File_events_proto protoreflect.FileDescriptor + +var file_events_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, + 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x59, 0x0a, 0x17, 0x54, 0x65, + 0x73, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x4c, 0x65, 0x67, 0x61, 0x63, 0x79, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x2e, 0x0a, 0x04, 0x77, 0x68, 0x65, 0x6e, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, + 0x04, 0x77, 0x68, 0x65, 0x6e, 0x22, 0x8e, 0x01, 0x0a, 0x08, 0x53, 0x75, 0x62, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x12, 0x34, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, + 0x74, 0x2e, 0x53, 0x75, 0x62, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x46, 0x6c, 0x61, 0x67, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x1a, 0x38, 0x0a, 0x0a, + 0x46, 0x6c, 0x61, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xcb, 0x03, 0x0a, 0x18, 0x54, 0x65, 0x73, 0x74, 0x43, + 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x78, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x2e, 0x0a, 0x04, 0x77, 0x68, 0x65, 0x6e, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x04, 0x77, 0x68, 0x65, 0x6e, 0x12, 0x51, 0x0a, 0x0a, 0x6e, 0x65, 0x73, 0x74, 0x65, + 0x64, 0x5f, 0x6d, 0x61, 0x70, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x63, 0x71, + 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x43, 0x6f, 0x6d, 0x70, + 0x6c, 0x65, 0x78, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x2e, 0x4e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, + 0x09, 0x6e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x4d, 0x61, 0x70, 0x12, 0x2b, 0x0a, 0x06, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x63, 0x71, 0x72, + 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x75, 0x62, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, + 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x2f, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, + 0x73, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x63, 0x71, 0x72, 0x73, 0x5f, + 0x74, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x75, 0x62, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x48, 0x00, 0x52, + 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x12, 0x2f, 0x0a, 0x08, 0x66, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x18, 0x08, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x11, 0x2e, 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x00, 0x52, 0x08, 0x66, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, + 0x6b, 0x1a, 0x51, 0x0a, 0x0e, 0x4e, 0x65, 0x73, 0x74, 0x65, 0x64, 0x4d, 0x61, 0x70, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x29, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, + 0x2e, 0x53, 0x75, 0x62, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x4a, 0x04, + 0x08, 0x17, 0x10, 0x1f, 0x2a, 0x39, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, + 0x0a, 0x12, 0x53, 0x54, 0x41, 0x54, 0x55, 0x53, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, + 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, + 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x44, 0x45, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, 0x42, + 0x0d, 0x5a, 0x0b, 0x2e, 0x2f, 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_events_proto_rawDescOnce sync.Once + file_events_proto_rawDescData = file_events_proto_rawDesc +) + +func file_events_proto_rawDescGZIP() []byte { + file_events_proto_rawDescOnce.Do(func() { + file_events_proto_rawDescData = protoimpl.X.CompressGZIP(file_events_proto_rawDescData) + }) + return file_events_proto_rawDescData +} + +var file_events_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_events_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_events_proto_goTypes = []interface{}{ + (Status)(0), // 0: cqrs_test.Status + (*TestProtobufLegacyEvent)(nil), // 1: cqrs_test.TestProtobufLegacyEvent + (*SubEvent)(nil), // 2: cqrs_test.SubEvent + (*TestComplexProtobufEvent)(nil), // 3: cqrs_test.TestComplexProtobufEvent + nil, // 4: cqrs_test.SubEvent.FlagsEntry + nil, // 5: cqrs_test.TestComplexProtobufEvent.NestedMapEntry + (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp +} +var file_events_proto_depIdxs = []int32{ + 6, // 0: cqrs_test.TestProtobufLegacyEvent.when:type_name -> google.protobuf.Timestamp + 4, // 1: cqrs_test.SubEvent.flags:type_name -> cqrs_test.SubEvent.FlagsEntry + 6, // 2: cqrs_test.TestComplexProtobufEvent.when:type_name -> google.protobuf.Timestamp + 5, // 3: cqrs_test.TestComplexProtobufEvent.nested_map:type_name -> cqrs_test.TestComplexProtobufEvent.NestedMapEntry + 2, // 4: cqrs_test.TestComplexProtobufEvent.events:type_name -> cqrs_test.SubEvent + 2, // 5: cqrs_test.TestComplexProtobufEvent.success:type_name -> cqrs_test.SubEvent + 0, // 6: cqrs_test.TestComplexProtobufEvent.fallback:type_name -> cqrs_test.Status + 2, // 7: cqrs_test.TestComplexProtobufEvent.NestedMapEntry.value:type_name -> cqrs_test.SubEvent + 8, // [8:8] is the sub-list for method output_type + 8, // [8:8] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name +} + +func init() { file_events_proto_init() } +func file_events_proto_init() { + if File_events_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_events_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TestProtobufLegacyEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_events_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_events_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TestComplexProtobufEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_events_proto_msgTypes[2].OneofWrappers = []interface{}{ + (*TestComplexProtobufEvent_Success)(nil), + (*TestComplexProtobufEvent_Error)(nil), + (*TestComplexProtobufEvent_Fallback)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_events_proto_rawDesc, + NumEnums: 1, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_events_proto_goTypes, + DependencyIndexes: file_events_proto_depIdxs, + EnumInfos: file_events_proto_enumTypes, + MessageInfos: file_events_proto_msgTypes, + }.Build() + File_events_proto = out.File + file_events_proto_rawDesc = nil + file_events_proto_goTypes = nil + file_events_proto_depIdxs = nil +} diff --git a/components/cqrs/marshaler_protobuf_events_test.go b/components/cqrs/marshaler_protobuf_events_test.go index 909ef7f59..2949f9081 100644 --- a/components/cqrs/marshaler_protobuf_events_test.go +++ b/components/cqrs/marshaler_protobuf_events_test.go @@ -1,158 +1,90 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.31.0 -// protoc v4.24.4 -// source: events.proto +// source: testdata/events.proto package cqrs_test import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" - reflect "reflect" - sync "sync" -) + fmt "fmt" + math "math" -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) + proto "github.com/golang/protobuf/proto" + timestamp "github.com/golang/protobuf/ptypes/timestamp" ) -type TestProtobufEvent struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - When *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=when,proto3" json:"when,omitempty"` -} +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package -func (x *TestProtobufEvent) Reset() { - *x = TestProtobufEvent{} - if protoimpl.UnsafeEnabled { - mi := &file_events_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } +type TestProtobufEvent struct { + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + When *timestamp.Timestamp `protobuf:"bytes,3,opt,name=when,proto3" json:"when,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } -func (x *TestProtobufEvent) String() string { - return protoimpl.X.MessageStringOf(x) +func (m *TestProtobufEvent) Reset() { *m = TestProtobufEvent{} } +func (m *TestProtobufEvent) String() string { return proto.CompactTextString(m) } +func (*TestProtobufEvent) ProtoMessage() {} +func (*TestProtobufEvent) Descriptor() ([]byte, []int) { + return fileDescriptor_37faf0ac8d97ee4c, []int{0} } -func (*TestProtobufEvent) ProtoMessage() {} - -func (x *TestProtobufEvent) ProtoReflect() protoreflect.Message { - mi := &file_events_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) +func (m *TestProtobufEvent) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_TestProtobufEvent.Unmarshal(m, b) } - -// Deprecated: Use TestProtobufEvent.ProtoReflect.Descriptor instead. -func (*TestProtobufEvent) Descriptor() ([]byte, []int) { - return file_events_proto_rawDescGZIP(), []int{0} +func (m *TestProtobufEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_TestProtobufEvent.Marshal(b, m, deterministic) +} +func (m *TestProtobufEvent) XXX_Merge(src proto.Message) { + xxx_messageInfo_TestProtobufEvent.Merge(m, src) +} +func (m *TestProtobufEvent) XXX_Size() int { + return xxx_messageInfo_TestProtobufEvent.Size(m) } +func (m *TestProtobufEvent) XXX_DiscardUnknown() { + xxx_messageInfo_TestProtobufEvent.DiscardUnknown(m) +} + +var xxx_messageInfo_TestProtobufEvent proto.InternalMessageInfo -func (x *TestProtobufEvent) GetId() string { - if x != nil { - return x.Id +func (m *TestProtobufEvent) GetId() string { + if m != nil { + return m.Id } return "" } -func (x *TestProtobufEvent) GetWhen() *timestamppb.Timestamp { - if x != nil { - return x.When +func (m *TestProtobufEvent) GetWhen() *timestamp.Timestamp { + if m != nil { + return m.When } return nil } -var File_events_proto protoreflect.FileDescriptor - -var file_events_proto_rawDesc = []byte{ - 0x0a, 0x0c, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, - 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x53, 0x0a, 0x11, 0x54, 0x65, - 0x73, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, - 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, - 0x2e, 0x0a, 0x04, 0x77, 0x68, 0x65, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x04, 0x77, 0x68, 0x65, 0x6e, 0x42, - 0x0d, 0x5a, 0x0b, 0x2e, 0x2f, 0x63, 0x71, 0x72, 0x73, 0x5f, 0x74, 0x65, 0x73, 0x74, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_events_proto_rawDescOnce sync.Once - file_events_proto_rawDescData = file_events_proto_rawDesc -) - -func file_events_proto_rawDescGZIP() []byte { - file_events_proto_rawDescOnce.Do(func() { - file_events_proto_rawDescData = protoimpl.X.CompressGZIP(file_events_proto_rawDescData) - }) - return file_events_proto_rawDescData -} - -var file_events_proto_msgTypes = make([]protoimpl.MessageInfo, 1) -var file_events_proto_goTypes = []interface{}{ - (*TestProtobufEvent)(nil), // 0: cqrs_test.TestProtobufEvent - (*timestamppb.Timestamp)(nil), // 1: google.protobuf.Timestamp -} -var file_events_proto_depIdxs = []int32{ - 1, // 0: cqrs_test.TestProtobufEvent.when:type_name -> google.protobuf.Timestamp - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name +func init() { + proto.RegisterType((*TestProtobufEvent)(nil), "cqrs_test.TestProtobufEvent") } -func init() { file_events_proto_init() } -func file_events_proto_init() { - if File_events_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_events_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*TestProtobufEvent); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_events_proto_rawDesc, - NumEnums: 0, - NumMessages: 1, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_events_proto_goTypes, - DependencyIndexes: file_events_proto_depIdxs, - MessageInfos: file_events_proto_msgTypes, - }.Build() - File_events_proto = out.File - file_events_proto_rawDesc = nil - file_events_proto_goTypes = nil - file_events_proto_depIdxs = nil +func init() { proto.RegisterFile("testdata/events.proto", fileDescriptor_37faf0ac8d97ee4c) } + +var fileDescriptor_37faf0ac8d97ee4c = []byte{ + // 146 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2d, 0x49, 0x2d, 0x2e, + 0x49, 0x49, 0x2c, 0x49, 0xd4, 0x4f, 0x2d, 0x4b, 0xcd, 0x2b, 0x29, 0xd6, 0x2b, 0x28, 0xca, 0x2f, + 0xc9, 0x17, 0xe2, 0x4c, 0x2e, 0x2c, 0x2a, 0x8e, 0x07, 0xc9, 0x49, 0xc9, 0xa7, 0xe7, 0xe7, 0xa7, + 0xe7, 0xa4, 0xea, 0x83, 0x25, 0x92, 0x4a, 0xd3, 0xf4, 0x4b, 0x32, 0x73, 0x53, 0x8b, 0x4b, 0x12, + 0x73, 0x0b, 0x20, 0x6a, 0x95, 0x82, 0xb9, 0x04, 0x43, 0x52, 0x8b, 0x4b, 0x02, 0xa0, 0xf2, 0xae, + 0x20, 0x73, 0x84, 0xf8, 0xb8, 0x98, 0x32, 0x53, 0x24, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0x98, + 0x32, 0x53, 0x84, 0xf4, 0xb8, 0x58, 0xca, 0x33, 0x52, 0xf3, 0x24, 0x98, 0x15, 0x18, 0x35, 0xb8, + 0x8d, 0xa4, 0xf4, 0x20, 0x86, 0xea, 0xc1, 0x0c, 0xd5, 0x0b, 0x81, 0x19, 0x1a, 0x04, 0x56, 0x97, + 0xc4, 0x06, 0x96, 0x31, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xff, 0x4e, 0x39, 0x0e, 0xa0, 0x00, + 0x00, 0x00, } diff --git a/components/cqrs/marshaler_protobuf_gogo.go b/components/cqrs/marshaler_protobuf_gogo.go new file mode 100644 index 000000000..39c5e0627 --- /dev/null +++ b/components/cqrs/marshaler_protobuf_gogo.go @@ -0,0 +1,127 @@ +package cqrs + +import ( + "fmt" + "runtime/debug" + + stderrors "errors" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/gogo/protobuf/proto" + stdproto "github.com/golang/protobuf/proto" + "github.com/pkg/errors" +) + +// ProtobufMarshaler a protobuf marshaler using github.com/gogo/protobuf/proto (deprecated). +// +// DEPRECATED: Use ProtoMarshaler instead. This marshaler will not work with newer protobuf files. +// IMPORTANT: This marshaler is backward and forward compatible with ProtoMarshaler. +// ProtobufMarshaler from Watermill versions until v1.4.3 are not forward compatible with ProtoMarshaler. +// Suggested migration steps: +// 1. Update Watermill to v1.4.4 or newer, so all publishers and subscribers will be forward and backward compatible. +// 2. Change all usages of ProtobufMarshaler to ProtoMarshaler. +type ProtobufMarshaler struct { + NewUUID func() string + GenerateName func(v interface{}) string + + // DisableStdProtoFallback disables fallback to github.com/golang/protobuf/proto when github.com/gogo/protobuf/proto + // because receiving a message that was marshaled with github.com/golang/protobuf/proto. + // Fallback is enabled by default to enable migration to ProtoMarshaler. + DisableStdProtoFallback bool +} + +// Marshal marshals the given protobuf's message into watermill's Message. +func (m ProtobufMarshaler) Marshal(v interface{}) (msg *message.Message, err error) { + defer func() { + // gogo proto can panic on unmarshal (for example, because it received a message from ProtoMarshaler with oneof) + if r := recover(); r != nil { + err = stderrors.Join(err, fmt.Errorf( + "github.com/gogo/protobuf/proto panic (we recommend migrating marshaler to cqrs.ProtoMarshaler to avoid that): %v\n%s", + r, + string(debug.Stack()), + )) + } + + if err != nil && !m.DisableStdProtoFallback { + _, isStdProtoMsg := v.(stdproto.Message) + + if isStdProtoMsg { + msg, err = m.ToProtoMarshaler().Marshal(v) + } + } + }() + + protoMsg, ok := v.(proto.Message) + if !ok { + return nil, errors.WithStack(NoProtoMessageError{v}) + } + + b, err := proto.Marshal(protoMsg) + if err != nil { + return nil, err + } + + msg = message.NewMessage( + m.newUUID(), + b, + ) + msg.Metadata.Set("name", m.Name(v)) + + return msg, nil +} + +func (m ProtobufMarshaler) newUUID() string { + if m.NewUUID != nil { + return m.NewUUID() + } + + // default + return watermill.NewUUID() +} + +// Unmarshal unmarshals given watermill's Message into protobuf's message. +func (m ProtobufMarshaler) Unmarshal(msg *message.Message, v interface{}) (err error) { + protoV, ok := v.(proto.Message) + if !ok { + return errors.WithStack(NoProtoMessageError{v}) + } + + defer func() { + // gogo proto can panic on unmarshal (for example, because it received a message from ProtoMarshaler with oneof) + if r := recover(); r != nil { + err = stderrors.Join(err, fmt.Errorf( + "github.com/gogo/protobuf/proto panic (we recommend migrating marshaler to cqrs.ProtoMarshaler to avoid that): %v\n%s", + r, + string(debug.Stack()), + )) + } + + if err != nil && !m.DisableStdProtoFallback { + err = m.ToProtoMarshaler().Unmarshal(msg, v) + } + }() + + return proto.Unmarshal(msg.Payload, protoV) +} + +func (m ProtobufMarshaler) ToProtoMarshaler() ProtoMarshaler { + return ProtoMarshaler{ + NewUUID: m.NewUUID, + GenerateName: m.GenerateName, + } +} + +// Name returns the command or event's name. +func (m ProtobufMarshaler) Name(cmdOrEvent interface{}) string { + if m.GenerateName != nil { + return m.GenerateName(cmdOrEvent) + } + + return FullyQualifiedStructName(cmdOrEvent) +} + +// NameFromMessage returns the metadata name value for a given Message. +func (m ProtobufMarshaler) NameFromMessage(msg *message.Message) string { + return msg.Metadata.Get("name") +} diff --git a/components/cqrs/marshaler_protobuf_gogo_test.go b/components/cqrs/marshaler_protobuf_gogo_test.go new file mode 100644 index 000000000..625c91e8a --- /dev/null +++ b/components/cqrs/marshaler_protobuf_gogo_test.go @@ -0,0 +1,142 @@ +package cqrs_test + +import ( + "testing" + + "github.com/ThreeDotsLabs/watermill/components/cqrs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProtobufMarshaler_with_fallback(t *testing.T) { + marshaler := cqrs.ProtobufMarshaler{} + + assertProtoMarshalUnmarshal( + t, + marshaler, + marshaler, + newProtoTestComplexEvent(), + &TestComplexProtobufEvent{}, + "cqrs_test.TestComplexProtobufEvent", + ) + + legacyEvent, _ := newProtoLegacyTestEvent() + assertProtoMarshalUnmarshal( + t, + marshaler, + marshaler, + legacyEvent, + &TestProtobufEvent{}, + "cqrs_test.TestProtobufEvent", + ) +} + +func TestProtobufMarshaler_without_fallback_legacy_event(t *testing.T) { + legacyEvent, _ := newProtoLegacyTestEvent() + + marshaler := cqrs.ProtobufMarshaler{ + DisableStdProtoFallback: true, + } + + assertProtoMarshalUnmarshal( + t, + marshaler, + marshaler, + legacyEvent, + &TestProtobufEvent{}, + "cqrs_test.TestProtobufEvent", + ) +} + +func TestProtobufMarshaler_Marshal_generated_name(t *testing.T) { + marshaler := cqrs.ProtobufMarshaler{ + NewUUID: func() string { + return "foo" + }, + } + + msg, err := marshaler.Marshal(newProtoTestComplexEvent()) + require.NoError(t, err) + + assert.Equal(t, msg.UUID, "foo") +} + +func TestProtobufMarshaler_catch_panic(t *testing.T) { + marshalerNoFallback := cqrs.ProtobufMarshaler{ + DisableStdProtoFallback: true, + } + marshalerWithFallback := cqrs.ProtobufMarshaler{ + DisableStdProtoFallback: false, + } + + complexEvent := newProtoTestComplexEvent() + + msg, err := marshalerNoFallback.Marshal(complexEvent) + assert.Nil(t, msg) + assert.ErrorContains(t, err, "(we recommend migrating marshaler to cqrs.ProtoMarshaler to avoid that)") + assert.ErrorContains(t, err, "invalid memory address or nil pointer dereference") + assert.ErrorContains(t, err, "github.com/gogo/protobuf/proto panic") + assert.ErrorContains(t, err, "runtime/debug.Stack()", "error should contain stack trace") + + // let's simulate situation when publishing service uses fallback and consuming service does not + msg, err = marshalerWithFallback.Marshal(complexEvent) + require.NoError(t, err) + + err = marshalerNoFallback.Unmarshal(msg, &TestComplexProtobufEvent{}) + assert.ErrorContains(t, err, "(we recommend migrating marshaler to cqrs.ProtoMarshaler to avoid that)") + assert.ErrorContains(t, err, "protobuf tag not enough fields in TestComplexProtobufEvent.state") + assert.ErrorContains(t, err, "github.com/gogo/protobuf/proto panic") + assert.ErrorContains(t, err, "runtime/debug.Stack()", "error should contain stack trace") + + // marshaler with fallback should handle this message + err = marshalerWithFallback.Unmarshal(msg, &TestComplexProtobufEvent{}) + require.NoError(t, err) +} + +func TestProtobufMarshaler_compatible_with_ProtoMarshaler(t *testing.T) { + legacyEvent, legacyEventRegenerated := newProtoLegacyTestEvent() + complexEvent := newProtoTestComplexEvent() + + deprecatedMarshaler := cqrs.ProtobufMarshaler{} + newMarshaler := cqrs.ProtoMarshaler{} + + t.Run("from_deprecated_to_new", func(t *testing.T) { + assertProtoMarshalUnmarshal( + t, + deprecatedMarshaler, + newMarshaler, + complexEvent, + &TestComplexProtobufEvent{}, + "cqrs_test.TestComplexProtobufEvent", + ) + + assertProtoMarshalUnmarshal( + t, + deprecatedMarshaler, + newMarshaler, + legacyEvent, + &TestProtobufLegacyEvent{}, + "cqrs_test.TestProtobufEvent", + ) + }) + + t.Run("from_new_to_deprecated", func(t *testing.T) { + assertProtoMarshalUnmarshal( + t, + newMarshaler, + deprecatedMarshaler, + complexEvent, + &TestComplexProtobufEvent{}, + "cqrs_test.TestComplexProtobufEvent", + ) + + assertProtoMarshalUnmarshal( + t, + newMarshaler, + deprecatedMarshaler, + legacyEventRegenerated, + &TestProtobufEvent{}, + "cqrs_test.TestProtobufLegacyEvent", + ) + }) +} diff --git a/components/cqrs/marshaler_protobuf_test.go b/components/cqrs/marshaler_protobuf_test.go index 276ea83dc..d6ae116e5 100644 --- a/components/cqrs/marshaler_protobuf_test.go +++ b/components/cqrs/marshaler_protobuf_test.go @@ -1,6 +1,8 @@ package cqrs_test import ( + "encoding/json" + "fmt" "testing" "time" @@ -12,41 +14,101 @@ import ( "github.com/ThreeDotsLabs/watermill/components/cqrs" ) -func TestProtobufMarshaler(t *testing.T) { - marshaler := cqrs.ProtobufMarshaler{} +func TestProtoMarshaler(t *testing.T) { + assertProtoMarshalUnmarshal( + t, + cqrs.ProtoMarshaler{}, + cqrs.ProtoMarshaler{}, + newProtoTestComplexEvent(), + &TestComplexProtobufEvent{}, + "cqrs_test.TestComplexProtobufEvent", + ) +} - when := timestamppb.New(time.Now()) - eventToMarshal := &TestProtobufEvent{ - Id: watermill.NewULID(), - When: when, +func TestProtoMarshaler_Marshal_generated_name(t *testing.T) { + marshaler := cqrs.ProtoMarshaler{ + NewUUID: func() string { + return "foo" + }, } - msg, err := marshaler.Marshal(eventToMarshal) - require.NoError(t, err) - - eventToUnmarshal := &TestProtobufEvent{} - err = marshaler.Unmarshal(msg, eventToUnmarshal) + msg, err := marshaler.Marshal(newProtoTestComplexEvent()) require.NoError(t, err) - assert.EqualValues(t, eventToMarshal.String(), eventToUnmarshal.String()) - assert.Equal(t, msg.Metadata.Get("name"), "cqrs_test.TestProtobufEvent") + assert.Equal(t, msg.UUID, "foo") } -func TestProtobufMarshaler_Marshal_generated_name(t *testing.T) { - marshaler := cqrs.ProtobufMarshaler{ - NewUUID: func() string { - return "foo" - }, +// newProtoLegacyTestEvent returns the same event in two different protobuf versions +func newProtoLegacyTestEvent() (*TestProtobufEvent, *TestProtobufLegacyEvent) { + when := timestamppb.New(time.Now()) + id := watermill.NewULID() + + legacy := &TestProtobufEvent{ + Id: id, + When: when, } + regenerated := &TestProtobufLegacyEvent{ + Id: id, + When: when, + } + + return legacy, regenerated +} + +func newProtoTestComplexEvent() *TestComplexProtobufEvent { when := timestamppb.New(time.Now()) - eventToMarshal := &TestProtobufEvent{ + + eventToMarshal := &TestComplexProtobufEvent{ Id: watermill.NewULID(), + Data: []byte("data"), When: when, + NestedMap: map[string]*SubEvent{ + "foo": { + Tags: []string{"tag1", "tag2"}, + Flags: map[string]bool{"flag1": true, "flag2": false}, + }, + }, + Events: []*SubEvent{ + { + Tags: []string{"tag1", "tag2"}, + }, + { + Tags: []string{"tag3", "tag4"}, + }, + }, + Result: &TestComplexProtobufEvent_Success{ + Success: &SubEvent{ + Tags: []string{"tag10"}, + Flags: map[string]bool{"flag10": true}, + }, + }, } + return eventToMarshal +} + +func assertProtoMarshalUnmarshal[T1, T2 fmt.Stringer]( + t *testing.T, + marshaler cqrs.CommandEventMarshaler, + unmarshaler cqrs.CommandEventMarshaler, + eventToMarshal T1, + eventToUnmarshal T2, + expectedEventName string, +) { + t.Helper() msg, err := marshaler.Marshal(eventToMarshal) require.NoError(t, err) - assert.Equal(t, msg.UUID, "foo") + err = unmarshaler.Unmarshal(msg, eventToUnmarshal) + require.NoError(t, err) + + eventToMarshalJson, err := json.Marshal(eventToMarshal) + require.NoError(t, err) + + eventToUnmarshalJson, err := json.Marshal(eventToUnmarshal) + require.NoError(t, err) + + assert.JSONEq(t, string(eventToMarshalJson), string(eventToUnmarshalJson)) + assert.Equal(t, expectedEventName, msg.Metadata.Get("name")) } diff --git a/components/cqrs/testdata/events.proto b/components/cqrs/testdata/events.proto index c005d9f54..4de0c9827 100644 --- a/components/cqrs/testdata/events.proto +++ b/components/cqrs/testdata/events.proto @@ -4,7 +4,35 @@ option go_package = "./cqrs_test"; import "google/protobuf/timestamp.proto"; -message TestProtobufEvent { +message TestProtobufLegacyEvent { string id = 1; google.protobuf.Timestamp when = 3; } + +enum Status { + STATUS_UNSPECIFIED = 0; + ACTIVE = 1; + DELETED = 2; +} + +message SubEvent { + repeated string tags = 1; + map flags = 2; +} + +message TestComplexProtobufEvent { + string id = 1; + bytes data = 2; + google.protobuf.Timestamp when = 3; + + map nested_map = 4; + repeated SubEvent events = 5; + + oneof result { + SubEvent success = 6; + string error = 7; + Status fallback = 8; + } + + reserved 23 to 30; +} diff --git a/go.mod b/go.mod index 0c2ec42e2..78c583009 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.21 require ( github.com/cenkalti/backoff/v3 v3.2.2 github.com/go-chi/chi/v5 v5.1.0 + github.com/gogo/protobuf v1.3.2 + github.com/golang/protobuf v1.5.0 github.com/google/uuid v1.6.0 github.com/hashicorp/go-multierror v1.1.1 github.com/lithammer/shortuuid/v3 v3.0.7 diff --git a/go.sum b/go.sum index 7c665da5a..e178719e2 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,11 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -20,6 +25,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -54,8 +61,36 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=