Skip to content

Commit

Permalink
Watermill Test Helpers (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbengfort authored Jun 13, 2023
1 parent aed3fa9 commit 9f17cc2
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 20 deletions.
49 changes: 49 additions & 0 deletions api/v1beta1/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ package api
import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"

"github.com/oklog/ulid/v2"
"google.golang.org/protobuf/proto"
)

// Wrap an event to create a complete protocol buffer to send to the Ensign server.
func (w *EventWrapper) Wrap(e *Event) (err error) {
if w.Event, err = proto.Marshal(e); err != nil {
return err
}
return nil
}

// Unwrap an event from the event wrapper for user consumption.
func (w *EventWrapper) Unwrap() (e *Event, err error) {
if len(w.Event) == 0 {
return nil, errors.New("event wrapper contains no event")
Expand All @@ -28,15 +32,60 @@ func (w *EventWrapper) Unwrap() (e *Event, err error) {
return e, nil
}

// Parse the TopicID as a ULID.
func (w *EventWrapper) ParseTopicID() (topicID ulid.ULID, err error) {
err = topicID.UnmarshalBinary(w.TopicId)
return topicID, err
}

// Returns the type name and semantic version as a whole string.
func (t *Type) Version() string {
return fmt.Sprintf("%s v%d.%d.%d", t.Name, t.MajorVersion, t.MinorVersion, t.PatchVersion)
}

// Returns just the semantic version of the type.
func (t *Type) Semver() string {
return fmt.Sprintf("%d.%d.%d", t.MajorVersion, t.MinorVersion, t.PatchVersion)
}

var (
ErrSemverParse = errors.New("could not parse version string as a semantic version 2.0.0")
semverPattern = regexp.MustCompile(`^(?P<major>0|[1-9]\d*)\.(?P<minor>0|[1-9]\d*)\.(?P<patch>0|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$`)
)

// Parses the semver 2.0.0 string and loads the information into the type.
// See https://semver.org/ and https://regex101.com/r/Ly7O1x/3/ for more on parsing.
// NOTE: any extensions of the version such as build and release are omitted, only the
// major, minor, and patch versions are added to the type.
func (t *Type) ParseSemver(version string) (err error) {
if !semverPattern.MatchString(version) {
return ErrSemverParse
}

matches := semverPattern.FindStringSubmatch(version)
if t.MajorVersion, err = parseUint32(matches[1]); err != nil {
return ErrSemverParse
}

if t.MinorVersion, err = parseUint32(matches[2]); err != nil {
return ErrSemverParse
}

if t.PatchVersion, err = parseUint32(matches[3]); err != nil {
return ErrSemverParse
}

return nil
}

func parseUint32(s string) (uint32, error) {
i, err := strconv.ParseUint(s, 10, 32)
if err != nil {
return 0, err
}
return uint32(i), nil
}

// Equals treats the name as case-insensitive.
func (t *Type) Equals(o *Type) bool {
tname := strings.TrimSpace(strings.ToLower(t.Name))
Expand Down
46 changes: 46 additions & 0 deletions api/v1beta1/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,49 @@ func TestType(t *testing.T) {
tc.assert(t, tc.alpha.Equals(tc.bravo), "test case %d failed", i)
}
}

func TestTypeSemver(t *testing.T) {
testCases := []struct {
name string
vers string
expected string
semver string
}{
{"GameStarted", "1.4.2", "GameStarted v1.4.2", "1.4.2"},
{"GameStarted", "0.0.4", "GameStarted v0.0.4", "0.0.4"},
{"GameStarted", "1.2.3", "GameStarted v1.2.3", "1.2.3"},
{"GameStarted", "10.20.30", "GameStarted v10.20.30", "10.20.30"},
{"GameStarted", "1.1.2-prerelease+meta", "GameStarted v1.1.2", "1.1.2"},
{"GameStarted", "1.1.2+meta-valid", "GameStarted v1.1.2", "1.1.2"},
{"GameStarted", "1.1.2-alpha", "GameStarted v1.1.2", "1.1.2"},
{"GameStarted", "1.1.2-beta", "GameStarted v1.1.2", "1.1.2"},
{"GameStarted", "1.1.2-alpha.1", "GameStarted v1.1.2", "1.1.2"},
{"GameStarted", "1.1.2-rc.1+build.123", "GameStarted v1.1.2", "1.1.2"},
{"Foo", "999999999.999999999.999999999", "Foo v999999999.999999999.999999999", "999999999.999999999.999999999"},
}

for i, tc := range testCases {
eventType := &api.Type{Name: tc.name}

err := eventType.ParseSemver(tc.vers)
require.NoError(t, err, "could not parse semver for test case %d", i)

require.Equal(t, tc.expected, eventType.Version(), "mismatched version in test case %d", i)
require.Equal(t, tc.semver, eventType.Semver(), "mismatched semver in test case %d", i)
}
}

func TestTypeBadSemver(t *testing.T) {
testCases := []string{
"1.0", "a.b.c", "", "not a version", "2", "1.2.3.4",
"99999999999999999999999.999999999999999999.99999999999999999",
"1.999999999999999999.99999999999999999",
"1.1.99999999999999999",
}

for _, tc := range testCases {
eventType := &api.Type{Name: "Bad version"}
err := eventType.ParseSemver(tc)
require.Error(t, err, "expected semver parsing error for %q", tc)
}
}
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
ErrTopicNameNotFound = errors.New("topic name not found in project")
ErrCannotAck = errors.New("cannot ack or nack an event not received from subscribe")
ErrOverwrite = errors.New("this operation would overwrite existing event data")
ErrNoTopicID = errors.New("topic id is not available on event")
)

type Errorer interface {
Expand Down
103 changes: 93 additions & 10 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"sync"
"time"

"github.com/oklog/ulid/v2"
api "github.com/rotationalio/go-ensign/api/v1beta1"
mimetype "github.com/rotationalio/go-ensign/mimetype/v1beta1"
"github.com/rotationalio/go-ensign/stream"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -46,7 +46,17 @@ type Event struct {
ctx context.Context
err error
pub <-chan *api.PublisherReply
sub *stream.Subscriber
sub Acknowledger
}

// Acknowledger allows consumers to send acks/nacks back to the server when they have
// successfully processed an event. An ack means that the event was processed and the
// consumer group offset can move on, while a nack means there was a local error and the
// nack code instructs the server how to handle the event. The subscriber implements
// this interface, but this can be mocked for testing events.
type Acknowledger interface {
Ack(*api.Ack) error
Nack(*api.Nack) error
}

type eventState uint8
Expand All @@ -59,18 +69,73 @@ const (
nacked // event has been nacked from user or server
)

// Returns the event ID if the event has been published; otherwise returns nil.
func (e *Event) ID() []byte {
const (
rlidSize = 10
encodedSize = 16
encoding = "0123456789abcdefghjkmnpqrstvwxyz"
)

// Returns the event ID if the event has been published; otherwise returns empty string.
func (e *Event) ID() string {
if e.info != nil && len(e.info.Id) > 0 {
return e.info.Id
// TODO: this is a port of the RLID encoding; is this the best way to encode?
if len(e.info.Id) == rlidSize {
dst := make([]byte, encodedSize)
dst[0] = encoding[(e.info.Id[0]&248)>>3]
dst[1] = encoding[((e.info.Id[0]&7)<<2)|((e.info.Id[1]&192)>>6)]
dst[2] = encoding[(e.info.Id[1]&62)>>1]
dst[3] = encoding[((e.info.Id[1]&1)<<4)|((e.info.Id[2]&240)>>4)]
dst[4] = encoding[((e.info.Id[2]&15)<<1)|((e.info.Id[3]&128)>>7)]
dst[5] = encoding[(e.info.Id[3]&124)>>2]
dst[6] = encoding[((e.info.Id[3]&3)<<3)|((e.info.Id[4]&224)>>5)]
dst[7] = encoding[e.info.Id[4]&31]
dst[8] = encoding[(e.info.Id[5]&248)>>3]
dst[9] = encoding[((e.info.Id[5]&7)<<2)|((e.info.Id[6]&192)>>6)]
dst[10] = encoding[(e.info.Id[6]&62)>>1]
dst[11] = encoding[((e.info.Id[6]&1)<<4)|((e.info.Id[7]&240)>>4)]
dst[12] = encoding[((e.info.Id[7]&15)<<1)|((e.info.Id[8]&128)>>7)]
dst[13] = encoding[(e.info.Id[8]&124)>>2]
dst[14] = encoding[((e.info.Id[8]&3)<<3)|((e.info.Id[9]&224)>>5)]
dst[15] = encoding[e.info.Id[9]&31]
return string(dst)
}
return fmt.Sprintf("%X", e.info.Id)
}
return nil
return ""
}

// Returns the topic ID that the event was published to if available; otherwise returns nil.
func (e *Event) TopicID() []byte {
// Returns the topic ID that the event was published to if available; otherwise returns
// an empty string. The TopicID is a ULID, the ULID can be parsed without going through
// a string representation using the TopicULID method. If the TopicID cannot be parsed
// as a ULID then a hexadecimal representation of the ID is returned. See the error from
// TopicULID for more info about what went wrong.
func (e *Event) TopicID() string {
if e.info != nil && len(e.info.TopicId) > 0 {
return e.info.TopicId
topicID, err := e.TopicULID()
if err != nil {
return fmt.Sprintf("%X", e.info.TopicId)
}
return topicID.String()
}
return ""
}

// Returns the topic ULID that the event was published to if available, otherwise
// returns an error if there is no info, the topic ID is nil, or was unparseable.
func (e *Event) TopicULID() (topicID ulid.ULID, err error) {
if e.info != nil && len(e.info.TopicId) > 0 {
err = topicID.UnmarshalBinary(e.info.TopicId)
return topicID, err
}
return topicID, ErrNoTopicID
}

// Returns the topic ID that the event was published to if available; otherwise returns
// nil. This method is primarily for testing and debugging purposes; users should use
// the metadata to store application-specific ID material.
func (e *Event) LocalID() []byte {
if e.info != nil && len(e.info.LocalId) > 0 {
return e.info.LocalId
}
return nil
}
Expand Down Expand Up @@ -270,7 +335,7 @@ func (e *Event) Equals(o *Event) bool {
}

// Convert an event into a protocol buffer event.
func (e *Event) toPB() *api.Event {
func (e *Event) Proto() *api.Event {
return &api.Event{
Data: e.Data,
Metadata: map[string]string(e.Metadata),
Expand Down Expand Up @@ -308,3 +373,21 @@ func (e *Event) fromPB(wrapper *api.EventWrapper, state eventState) (err error)

return nil
}

// Creates a new outgoing event to be published. This method is generally used by tests
// to create mock events with the acked/nacked channels listening for a response from
// the publisher stream.
func NewOutgoingEvent(e *api.EventWrapper, pub <-chan *api.PublisherReply) *Event {
event := &Event{pub: pub}
event.fromPB(e, published)
return event
}

// Creates a new incoming event as though it were from a subscription. This method is
// generally used by tests to crate mock events with an acknowledger for ensuring that
// an event is correctly acked/nacked to the consumer stream.
func NewIncomingEvent(e *api.EventWrapper, sub Acknowledger) *Event {
event := &Event{sub: sub}
event.fromPB(e, subscription)
return event
}
70 changes: 70 additions & 0 deletions event_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package ensign_test

import (
"bytes"
"math/rand"
"testing"
"time"

"github.com/oklog/ulid/v2"
"github.com/rotationalio/go-ensign"
api "github.com/rotationalio/go-ensign/api/v1beta1"
mimetype "github.com/rotationalio/go-ensign/mimetype/v1beta1"
"github.com/stretchr/testify/require"
)

// NewEvent returns a new random event for testing purposes.
Expand All @@ -25,3 +29,69 @@ func NewEvent() *ensign.Event {
rand.Read(event.Data)
return event
}

func TestEventIDParsing(t *testing.T) {
testCases := []struct {
input []byte
expected string
}{
{nil, ""},
{[]byte{}, ""},
{[]byte{0x42}, "42"},
{[]byte{0x01, 0x83, 0x42, 0x5F, 0x66, 0x6F, 0x00, 0x6F, 0xEB, 0x6B}, "061m4qv6dw06ztvb"},
{[]byte{0x01, 0x83, 0x42, 0x5F, 0x66, 0x6F, 0x00, 0x6F, 0xEB, 0x6B, 0x66, 0x6F, 0x00, 0x6F, 0xEB, 0x6B}, "0183425F666F006FEB6B666F006FEB6B"},
{bytes.Repeat([]byte{0x42, 0xef}, 10), "42EF42EF42EF42EF42EF42EF42EF42EF42EF42EF"},
{bytes.Repeat([]byte{0x42}, 10), "89144gj289144gj2"},
}

for i, tc := range testCases {
evt := &api.EventWrapper{Id: tc.input}
out := ensign.NewOutgoingEvent(evt, nil)

require.Equal(t, tc.expected, out.ID(), "test case %d did not parse outgoing event correctly", i)

inc := ensign.NewIncomingEvent(evt, nil)
require.Equal(t, tc.expected, inc.ID(), "test case %d did not parse incoming event correctly", i)
}
}

func TestTopicIDParsing(t *testing.T) {
testCases := []struct {
input []byte
expected string
topicID ulid.ULID
err error
}{
{nil, "", ulid.ULID{}, ensign.ErrNoTopicID},
{[]byte{}, "", ulid.ULID{}, ensign.ErrNoTopicID},
{[]byte{0x41}, "41", ulid.ULID{}, ulid.ErrDataSize},
{ulid.MustParse("01H2RT8KB5TZZT4NPNPCJD4A1B").Bytes(), "01H2RT8KB5TZZT4NPNPCJD4A1B", ulid.MustParse("01H2RT8KB5TZZT4NPNPCJD4A1B"), nil},
{bytes.Repeat([]byte{0x42}, 16), "2289144GJ289144GJ289144GJ2", ulid.MustParse("2289144GJ289144GJ289144GJ2"), nil},
{bytes.Repeat([]byte{0x42, 0xef}, 10), "42EF42EF42EF42EF42EF42EF42EF42EF42EF42EF", ulid.ULID{}, ulid.ErrDataSize},
}

for i, tc := range testCases {
evt := &api.EventWrapper{TopicId: tc.input}
out := ensign.NewOutgoingEvent(evt, nil)

require.Equal(t, tc.expected, out.TopicID(), "test case %d did not parse outgoing event correctly", i)

tid, err := out.TopicULID()
if tc.err != nil {
require.ErrorIs(t, err, tc.err)
} else {
require.Equal(t, tc.topicID, tid)
}

inc := ensign.NewIncomingEvent(evt, nil)
require.Equal(t, tc.expected, inc.TopicID(), "test case %d did not parse incoming event correctly", i)

tid, err = inc.TopicULID()
if tc.err != nil {
require.ErrorIs(t, err, tc.err)
} else {
require.Equal(t, tc.topicID, tid)
}

}
}
6 changes: 5 additions & 1 deletion publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ func (c *Client) Publish(topic string, events ...*Event) (err error) {

// Attempt to send all events to the server, stopping on the first error.
for _, event := range events {
if event.pub, err = c.pub.Publish(topic, event.toPB()); err != nil {
// Publish the event and collect the event info and reply channel.
if event.info, event.pub, err = c.pub.Publish(topic, event.Proto()); err != nil {
return err
}

// Ensure the event state is set to published.
event.state = published
}
return nil
}
Expand Down
Loading

0 comments on commit 9f17cc2

Please sign in to comment.