From 66323c4e8342edc560a7e993f9a77123a8445bc3 Mon Sep 17 00:00:00 2001 From: JP Robinson Date: Tue, 9 Oct 2018 16:17:35 -0400 Subject: [PATCH] expanding HTTP publishers to implement the MultiPublisher interface (#155) * expanding HTTP publishers to implement the MultiPublisher interface * adding tests --- pubsub/http/pub.go | 76 ++++++++++++++++--- pubsub/http/pub_test.go | 163 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 229 insertions(+), 10 deletions(-) diff --git a/pubsub/http/pub.go b/pubsub/http/pub.go index dd08a739a..d7490dd85 100644 --- a/pubsub/http/pub.go +++ b/pubsub/http/pub.go @@ -13,25 +13,28 @@ import ( "golang.org/x/net/context" ) -type publisher struct { +// Publisher implements the pubsub.Publisher and MultiPublisher interfaces for use in a +// plain HTTP environment. +type Publisher struct { url string client *http.Client } -type gcpPublisher struct { +// GCPPublisher publishes data in the same format as a GCP push-style payload. +type GCPPublisher struct { pubsub.Publisher } // NewPublisher will return a pubsub.Publisher that simply posts the payload to // the given URL. If no http.Client is provided, the default one has a 5 second // timeout. -func NewPublisher(url string, client *http.Client) pubsub.Publisher { +func NewPublisher(url string, client *http.Client) Publisher { if client == nil { client = &http.Client{ Timeout: 5 * time.Second, } } - return publisher{url: url, client: client} + return Publisher{url: url, client: client} } // NewGCPStylePublisher will return a pubsub.Publisher that wraps the payload @@ -39,20 +42,46 @@ func NewPublisher(url string, client *http.Client) pubsub.Publisher { // Google's PubSub posting messages to a server. // If no http.Client is provided, the default one has a 5 second // timeout. -func NewGCPStylePublisher(url string, client *http.Client) pubsub.Publisher { - return gcpPublisher{NewPublisher(url, client)} +func NewGCPStylePublisher(url string, client *http.Client) GCPPublisher { + return GCPPublisher{NewPublisher(url, client)} } -func (p publisher) Publish(ctx context.Context, key string, msg proto.Message) error { +// Publish will serialize the given message and pass it to PublishRaw. +func (p Publisher) Publish(ctx context.Context, key string, msg proto.Message) error { payload, err := proto.Marshal(msg) if err != nil { return err } return p.PublishRaw(ctx, key, payload) +} +// PublishMulti will serialize the given messages and pass them to PublishMultiRaw. +func (p Publisher) PublishMulti(ctx context.Context, keys []string, msgs []proto.Message) error { + bmsgs := make([][]byte, len(msgs)) + for i, msg := range msgs { + payload, err := proto.Marshal(msg) + if err != nil { + return err + } + bmsgs[i] = payload + } + return p.PublishMultiRaw(ctx, keys, bmsgs) } -func (p publisher) PublishRaw(_ context.Context, _ string, payload []byte) error { +// PublishMultiRaw will call PublishRaw for each message given. +func (p Publisher) PublishMultiRaw(ctx context.Context, _ []string, msgs [][]byte) error { + for _, msg := range msgs { + err := p.PublishRaw(ctx, "", msg) + if err != nil { + return err + } + } + return nil +} + +// PublishRaw will POST the given message payload at the URL provided in the Publisher +// construct. +func (p Publisher) PublishRaw(_ context.Context, _ string, payload []byte) error { req, err := http.NewRequest("POST", p.url, bytes.NewReader(payload)) if err != nil { return err @@ -82,7 +111,8 @@ type message struct { Data []byte `json:"data"` } -func (p gcpPublisher) Publish(ctx context.Context, key string, msg proto.Message) error { +// Publish will serialize the given message and pass it to PublishRaw. +func (p GCPPublisher) Publish(ctx context.Context, key string, msg proto.Message) error { payload, err := proto.Marshal(msg) if err != nil { return err @@ -90,10 +120,36 @@ func (p gcpPublisher) Publish(ctx context.Context, key string, msg proto.Message return p.PublishRaw(ctx, key, payload) } -func (p gcpPublisher) PublishRaw(ctx context.Context, key string, msg []byte) error { +// PublishRaw will wrap the given message in a struct similar to GCP's push-style PubSub +// subscriptions and then POST the message payload at the URL provided in the construct. +func (p GCPPublisher) PublishRaw(ctx context.Context, key string, msg []byte) error { payload, err := json.Marshal(gcpPayload{Message: message{Data: msg}}) if err != nil { return err } return p.Publisher.PublishRaw(ctx, key, payload) } + +// PublishMulti will serialize the given messages and pass them to PublishMultiRaw. +func (p GCPPublisher) PublishMulti(ctx context.Context, keys []string, msgs []proto.Message) error { + bmsgs := make([][]byte, len(msgs)) + for i, msg := range msgs { + payload, err := proto.Marshal(msg) + if err != nil { + return err + } + bmsgs[i] = payload + } + return p.PublishMultiRaw(ctx, keys, bmsgs) +} + +// PublishMultiRaw will call PublishRaw for each message given. +func (p GCPPublisher) PublishMultiRaw(ctx context.Context, _ []string, msgs [][]byte) error { + for _, msg := range msgs { + err := p.PublishRaw(ctx, "", msg) + if err != nil { + return err + } + } + return nil +} diff --git a/pubsub/http/pub_test.go b/pubsub/http/pub_test.go index 9bab9a80f..6dcda3e1a 100644 --- a/pubsub/http/pub_test.go +++ b/pubsub/http/pub_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/golang/protobuf/proto" + "github.com/google/go-cmp/cmp" ) func TestPublishRaw(t *testing.T) { @@ -258,3 +259,165 @@ func TestGCPPublish(t *testing.T) { } } + +func TestPublishMulti(t *testing.T) { + tests := []struct { + givenPayloads []proto.Message + givenErr bool + + wantPayloads []TestProto + wantErr bool + }{ + { + givenPayloads: []proto.Message{ + &TestProto{"hi there!"}, + &TestProto{"howdy!"}, + &TestProto{"hello!"}, + }, + + wantPayloads: []TestProto{{"hi there!"}, + TestProto{"howdy!"}, + TestProto{"hello!"}, + }, + }, + { + givenPayloads: []proto.Message{ + &TestProto{"hi there!"}, + &TestProto{"howdy!"}, + &TestProto{"hello!"}, + }, + + givenErr: true, + wantErr: true, + }, + } + + for _, test := range tests { + var resps []TestProto + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Errorf("unable to read published request body: %s", err) + } + + var got TestProto + err = proto.Unmarshal(body, &got) + if err != nil { + t.Errorf("unable to proto marshal published request body: %s", err) + } + + resps = append(resps, got) + + if test.givenErr { + w.WriteHeader(http.StatusServiceUnavailable) + io.WriteString(w, "doh!") + return + } + w.WriteHeader(http.StatusOK) + })) + + pub := NewPublisher(srv.URL, nil) + + gotErr := pub.PublishMulti(nil, nil, test.givenPayloads) + + if test.wantErr && gotErr == nil { + t.Errorf("expected error response from publish but got none") + } + if !test.wantErr && gotErr != nil { + t.Errorf("expected no error response from publish but got one: %s", gotErr) + } + srv.Close() + + if gotErr != nil { + return + } + + if !cmp.Equal(test.wantPayloads, resps) { + t.Errorf("payloads did not match expectations:\n\n%s", cmp.Diff(test.wantPayloads, resps)) + } + } +} + +func TestGCPPublishMulti(t *testing.T) { + tests := []struct { + givenPayloads []proto.Message + givenErr bool + + wantPayloads []TestProto + wantErr bool + }{ + { + givenPayloads: []proto.Message{ + &TestProto{"hi there!"}, + &TestProto{"howdy!"}, + &TestProto{"hello!"}, + }, + + wantPayloads: []TestProto{{"hi there!"}, + TestProto{"howdy!"}, + TestProto{"hello!"}, + }, + }, + { + givenPayloads: []proto.Message{ + &TestProto{"hi there!"}, + &TestProto{"howdy!"}, + &TestProto{"hello!"}, + }, + + givenErr: true, + wantErr: true, + }, + } + + for _, test := range tests { + var resps []TestProto + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + t.Errorf("unable to read published request body: %s", err) + } + + var msg gcpPayload + err = json.Unmarshal(body, &msg) + if err != nil { + t.Errorf("unable to json marshal published request body: %s", err) + } + + var got TestProto + err = proto.Unmarshal(msg.Message.Data, &got) + if err != nil { + t.Errorf("unable to proto marshal published request body: %s", err) + } + + resps = append(resps, got) + + if test.givenErr { + w.WriteHeader(http.StatusServiceUnavailable) + io.WriteString(w, "doh!") + return + } + w.WriteHeader(http.StatusOK) + })) + + pub := NewGCPStylePublisher(srv.URL, nil) + + gotErr := pub.PublishMulti(nil, nil, test.givenPayloads) + + if test.wantErr && gotErr == nil { + t.Errorf("expected error response from publish but got none") + } + if !test.wantErr && gotErr != nil { + t.Errorf("expected no error response from publish but got one: %s", gotErr) + } + srv.Close() + + if gotErr != nil { + return + } + + if !cmp.Equal(test.wantPayloads, resps) { + t.Errorf("payloads did not match expectations:\n\n%s", cmp.Diff(test.wantPayloads, resps)) + } + } +}