Skip to content

Commit

Permalink
expanding HTTP publishers to implement the MultiPublisher interface (#…
Browse files Browse the repository at this point in the history
…155)

* expanding HTTP publishers to implement the MultiPublisher interface

* adding tests
  • Loading branch information
jprobinson authored Oct 9, 2018
1 parent 0e7f5a1 commit 66323c4
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 10 deletions.
76 changes: 66 additions & 10 deletions pubsub/http/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,75 @@ import (
"golang.org/x/net/context"
)

type publisher struct {
// Publisher implements the pubsub.Publisher and MultiPublisher interfaces for use in a
// plain HTTP environment.
type Publisher struct {
url string
client *http.Client
}

type gcpPublisher struct {
// GCPPublisher publishes data in the same format as a GCP push-style payload.
type GCPPublisher struct {
pubsub.Publisher
}

// NewPublisher will return a pubsub.Publisher that simply posts the payload to
// the given URL. If no http.Client is provided, the default one has a 5 second
// timeout.
func NewPublisher(url string, client *http.Client) pubsub.Publisher {
func NewPublisher(url string, client *http.Client) Publisher {
if client == nil {
client = &http.Client{
Timeout: 5 * time.Second,
}
}
return publisher{url: url, client: client}
return Publisher{url: url, client: client}
}

// NewGCPStylePublisher will return a pubsub.Publisher that wraps the payload
// in a GCP pubsub.Message-like object that will make this publisher emulate
// Google's PubSub posting messages to a server.
// If no http.Client is provided, the default one has a 5 second
// timeout.
func NewGCPStylePublisher(url string, client *http.Client) pubsub.Publisher {
return gcpPublisher{NewPublisher(url, client)}
func NewGCPStylePublisher(url string, client *http.Client) GCPPublisher {
return GCPPublisher{NewPublisher(url, client)}
}

func (p publisher) Publish(ctx context.Context, key string, msg proto.Message) error {
// Publish will serialize the given message and pass it to PublishRaw.
func (p Publisher) Publish(ctx context.Context, key string, msg proto.Message) error {
payload, err := proto.Marshal(msg)
if err != nil {
return err
}
return p.PublishRaw(ctx, key, payload)
}

// PublishMulti will serialize the given messages and pass them to PublishMultiRaw.
func (p Publisher) PublishMulti(ctx context.Context, keys []string, msgs []proto.Message) error {
bmsgs := make([][]byte, len(msgs))
for i, msg := range msgs {
payload, err := proto.Marshal(msg)
if err != nil {
return err
}
bmsgs[i] = payload
}
return p.PublishMultiRaw(ctx, keys, bmsgs)
}

func (p publisher) PublishRaw(_ context.Context, _ string, payload []byte) error {
// PublishMultiRaw will call PublishRaw for each message given.
func (p Publisher) PublishMultiRaw(ctx context.Context, _ []string, msgs [][]byte) error {
for _, msg := range msgs {
err := p.PublishRaw(ctx, "", msg)
if err != nil {
return err
}
}
return nil
}

// PublishRaw will POST the given message payload at the URL provided in the Publisher
// construct.
func (p Publisher) PublishRaw(_ context.Context, _ string, payload []byte) error {
req, err := http.NewRequest("POST", p.url, bytes.NewReader(payload))
if err != nil {
return err
Expand Down Expand Up @@ -82,18 +111,45 @@ type message struct {
Data []byte `json:"data"`
}

func (p gcpPublisher) Publish(ctx context.Context, key string, msg proto.Message) error {
// Publish will serialize the given message and pass it to PublishRaw.
func (p GCPPublisher) Publish(ctx context.Context, key string, msg proto.Message) error {
payload, err := proto.Marshal(msg)
if err != nil {
return err
}
return p.PublishRaw(ctx, key, payload)
}

func (p gcpPublisher) PublishRaw(ctx context.Context, key string, msg []byte) error {
// PublishRaw will wrap the given message in a struct similar to GCP's push-style PubSub
// subscriptions and then POST the message payload at the URL provided in the construct.
func (p GCPPublisher) PublishRaw(ctx context.Context, key string, msg []byte) error {
payload, err := json.Marshal(gcpPayload{Message: message{Data: msg}})
if err != nil {
return err
}
return p.Publisher.PublishRaw(ctx, key, payload)
}

// PublishMulti will serialize the given messages and pass them to PublishMultiRaw.
func (p GCPPublisher) PublishMulti(ctx context.Context, keys []string, msgs []proto.Message) error {
bmsgs := make([][]byte, len(msgs))
for i, msg := range msgs {
payload, err := proto.Marshal(msg)
if err != nil {
return err
}
bmsgs[i] = payload
}
return p.PublishMultiRaw(ctx, keys, bmsgs)
}

// PublishMultiRaw will call PublishRaw for each message given.
func (p GCPPublisher) PublishMultiRaw(ctx context.Context, _ []string, msgs [][]byte) error {
for _, msg := range msgs {
err := p.PublishRaw(ctx, "", msg)
if err != nil {
return err
}
}
return nil
}
163 changes: 163 additions & 0 deletions pubsub/http/pub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
)

func TestPublishRaw(t *testing.T) {
Expand Down Expand Up @@ -258,3 +259,165 @@ func TestGCPPublish(t *testing.T) {
}

}

func TestPublishMulti(t *testing.T) {
tests := []struct {
givenPayloads []proto.Message
givenErr bool

wantPayloads []TestProto
wantErr bool
}{
{
givenPayloads: []proto.Message{
&TestProto{"hi there!"},
&TestProto{"howdy!"},
&TestProto{"hello!"},
},

wantPayloads: []TestProto{{"hi there!"},
TestProto{"howdy!"},
TestProto{"hello!"},
},
},
{
givenPayloads: []proto.Message{
&TestProto{"hi there!"},
&TestProto{"howdy!"},
&TestProto{"hello!"},
},

givenErr: true,
wantErr: true,
},
}

for _, test := range tests {
var resps []TestProto
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Errorf("unable to read published request body: %s", err)
}

var got TestProto
err = proto.Unmarshal(body, &got)
if err != nil {
t.Errorf("unable to proto marshal published request body: %s", err)
}

resps = append(resps, got)

if test.givenErr {
w.WriteHeader(http.StatusServiceUnavailable)
io.WriteString(w, "doh!")
return
}
w.WriteHeader(http.StatusOK)
}))

pub := NewPublisher(srv.URL, nil)

gotErr := pub.PublishMulti(nil, nil, test.givenPayloads)

if test.wantErr && gotErr == nil {
t.Errorf("expected error response from publish but got none")
}
if !test.wantErr && gotErr != nil {
t.Errorf("expected no error response from publish but got one: %s", gotErr)
}
srv.Close()

if gotErr != nil {
return
}

if !cmp.Equal(test.wantPayloads, resps) {
t.Errorf("payloads did not match expectations:\n\n%s", cmp.Diff(test.wantPayloads, resps))
}
}
}

func TestGCPPublishMulti(t *testing.T) {
tests := []struct {
givenPayloads []proto.Message
givenErr bool

wantPayloads []TestProto
wantErr bool
}{
{
givenPayloads: []proto.Message{
&TestProto{"hi there!"},
&TestProto{"howdy!"},
&TestProto{"hello!"},
},

wantPayloads: []TestProto{{"hi there!"},
TestProto{"howdy!"},
TestProto{"hello!"},
},
},
{
givenPayloads: []proto.Message{
&TestProto{"hi there!"},
&TestProto{"howdy!"},
&TestProto{"hello!"},
},

givenErr: true,
wantErr: true,
},
}

for _, test := range tests {
var resps []TestProto
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Errorf("unable to read published request body: %s", err)
}

var msg gcpPayload
err = json.Unmarshal(body, &msg)
if err != nil {
t.Errorf("unable to json marshal published request body: %s", err)
}

var got TestProto
err = proto.Unmarshal(msg.Message.Data, &got)
if err != nil {
t.Errorf("unable to proto marshal published request body: %s", err)
}

resps = append(resps, got)

if test.givenErr {
w.WriteHeader(http.StatusServiceUnavailable)
io.WriteString(w, "doh!")
return
}
w.WriteHeader(http.StatusOK)
}))

pub := NewGCPStylePublisher(srv.URL, nil)

gotErr := pub.PublishMulti(nil, nil, test.givenPayloads)

if test.wantErr && gotErr == nil {
t.Errorf("expected error response from publish but got none")
}
if !test.wantErr && gotErr != nil {
t.Errorf("expected no error response from publish but got one: %s", gotErr)
}
srv.Close()

if gotErr != nil {
return
}

if !cmp.Equal(test.wantPayloads, resps) {
t.Errorf("payloads did not match expectations:\n\n%s", cmp.Diff(test.wantPayloads, resps))
}
}
}

0 comments on commit 66323c4

Please sign in to comment.