Skip to content

Commit

Permalink
🐛 Using Content Type / Encoding from Delivery (#58)
Browse files Browse the repository at this point in the history
* 🐛 Using Content Type / Encoding from Delivery
* 🐛 Bug Fix in client related to nil body
* ✅ Fixed Test
  • Loading branch information
Templum authored Oct 14, 2020
1 parent f0fe91d commit 43976f8
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 57 deletions.
5 changes: 3 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main

import (
"fmt"
"github.com/Templum/rabbitmq-connector/pkg/types"
"log"
"os"
"sync"
Expand Down Expand Up @@ -104,11 +105,11 @@ func newInvokerMock() *invokerMock {
return &invokerMock{counter: 0}
}

func (m *invokerMock) Invoke(topic string, message []byte) {
func (m *invokerMock) Invoke(topic string, invoke *types.OpenFaaSInvocation) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.counter++
invocation := NewInvocation(topic, &message, m.counter)
invocation := NewInvocation(topic, invoke.Message, m.counter)
m.invocations = append(m.invocations, invocation)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/openfaas/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
func TestTopicMap(t *testing.T) {
t.Parallel()

update := map[string][]string{"billing": []string{"taxes", "notify"}}
update := map[string][]string{"billing": {"taxes", "notify"}}

t.Run("Should override cache with update", func(t *testing.T) {
cache := NewTopicFunctionCache()
Expand Down
5 changes: 3 additions & 2 deletions pkg/openfaas/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package openfaas

import (
"context"
types2 "github.com/Templum/rabbitmq-connector/pkg/types"
"log"
"strings"
"time"
Expand Down Expand Up @@ -42,11 +43,11 @@ func (c *Controller) Start(ctx context.Context) {
}

// Invoke triggers a call to all functions registered to the specified topic
func (c *Controller) Invoke(topic string, message []byte) {
func (c *Controller) Invoke(topic string, invocation *types2.OpenFaaSInvocation) {
functions := c.cache.GetCachedValues(topic)

for _, fn := range functions {
go c.client.InvokeSync(context.Background(), fn, message)
go c.client.InvokeSync(context.Background(), fn, invocation)
}
}

Expand Down
15 changes: 8 additions & 7 deletions pkg/openfaas/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package openfaas

import (
"context"
types2 "github.com/Templum/rabbitmq-connector/pkg/types"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -49,11 +50,11 @@ func (m *MockOpenFaaSClient) InvokeCalledNTimes() int {
return m.invocation
}

func (m *MockOpenFaaSClient) InvokeAsync(ctx context.Context, name string, payload []byte) (bool, error) {
func (m *MockOpenFaaSClient) InvokeAsync(ctx context.Context, name string, invocation *types2.OpenFaaSInvocation) (bool, error) {
return true, nil
}

func (m *MockOpenFaaSClient) InvokeSync(ctx context.Context, name string, payload []byte) ([]byte, error) {
func (m *MockOpenFaaSClient) InvokeSync(ctx context.Context, name string, invocation *types2.OpenFaaSInvocation) ([]byte, error) {
m.lock.Lock()
defer m.lock.Unlock()

Expand Down Expand Up @@ -86,7 +87,7 @@ func TestCacher_Start_WithNs(t *testing.T) {
annotations := map[string]string{"topic": "billing,secret,transport"}

fnFaaSNs := []types.FunctionStatus{
types.FunctionStatus{
{
Name: "biller",
Image: "docker:image",
InvocationCount: 0,
Expand All @@ -97,7 +98,7 @@ func TestCacher_Start_WithNs(t *testing.T) {
Annotations: &annotations,
Namespace: "faas",
},
types.FunctionStatus{
{
Name: "secrter",
Image: "docker:image",
InvocationCount: 0,
Expand All @@ -111,7 +112,7 @@ func TestCacher_Start_WithNs(t *testing.T) {
}

fnTestNs := []types.FunctionStatus{
types.FunctionStatus{
{
Name: "transporter",
Image: "docker:image",
InvocationCount: 0,
Expand Down Expand Up @@ -166,7 +167,7 @@ func TestCacher_Start_Normal(t *testing.T) {
annotations := map[string]string{"topic": "billing,secret,transport"}

functions := []types.FunctionStatus{
types.FunctionStatus{
{
Name: "function-name",
Image: "docker:image",
InvocationCount: 0,
Expand All @@ -177,7 +178,7 @@ func TestCacher_Start_Normal(t *testing.T) {
Annotations: &annotations,
Namespace: "faas",
},
types.FunctionStatus{
{
Name: "wrencher",
Image: "docker:image",
InvocationCount: 0,
Expand Down
34 changes: 28 additions & 6 deletions pkg/openfaas/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"encoding/json"
"fmt"
types2 "github.com/Templum/rabbitmq-connector/pkg/types"
"io"
"io/ioutil"
"net/http"

Expand All @@ -18,8 +20,8 @@ import (

// Invoker defines interfaces that invoke deployed OpenFaaS Functions.
type Invoker interface {
InvokeSync(ctx context.Context, name string, payload []byte) ([]byte, error)
InvokeAsync(ctx context.Context, name string, payload []byte) (bool, error)
InvokeSync(ctx context.Context, name string, invocation *types2.OpenFaaSInvocation) ([]byte, error)
InvokeAsync(ctx context.Context, name string, invocation *types2.OpenFaaSInvocation) (bool, error)
}

// NamespaceFetcher defines interfaces to explore namespaces of an OpenFaaS installation.
Expand Down Expand Up @@ -58,10 +60,20 @@ func NewClient(client *http.Client, creds *auth.BasicAuthCredentials, gatewayURL
}

// InvokeSync calls a given function in a synchronous way waiting for the response using the provided payload while considering the provided context
func (c *Client) InvokeSync(ctx context.Context, name string, payload []byte) ([]byte, error) { // TODO: either reuse provided payload or make it parseable
func (c *Client) InvokeSync(ctx context.Context, name string, invocation *types2.OpenFaaSInvocation) ([]byte, error) { // TODO: either reuse provided payload or make it parseable
functionURL := fmt.Sprintf("%s/function/%s", c.url, name)

req, _ := http.NewRequestWithContext(ctx, http.MethodPost, functionURL, bytes.NewReader(payload))
var body io.Reader
if invocation.Message != nil {
body = bytes.NewReader(*invocation.Message)
} else {
body = nil
}

req, _ := http.NewRequestWithContext(ctx, http.MethodPost, functionURL, body)
req.Header.Set("Content-Type", invocation.ContentType)
req.Header.Set("Content-Encoding", invocation.ContentEncoding)

if c.credentials != nil {
req.SetBasicAuth(c.credentials.User, c.credentials.Password)
}
Expand Down Expand Up @@ -95,10 +107,20 @@ func (c *Client) InvokeSync(ctx context.Context, name string, payload []byte) ([
}

// InvokeAsync calls a given function in a asynchronous way waiting for the response using the provided payload while considering the provided context
func (c *Client) InvokeAsync(ctx context.Context, name string, payload []byte) (bool, error) { // TODO: either reuse provided payload or make it parseable
func (c *Client) InvokeAsync(ctx context.Context, name string, invocation *types2.OpenFaaSInvocation) (bool, error) {
functionURL := fmt.Sprintf("%s/async-function/%s", c.url, name)

req, _ := http.NewRequestWithContext(ctx, http.MethodPost, functionURL, bytes.NewReader(payload))
var body io.Reader
if invocation.Message != nil {
body = bytes.NewReader(*invocation.Message)
} else {
body = nil
}

req, _ := http.NewRequestWithContext(ctx, http.MethodPost, functionURL, body)
req.Header.Set("Content-Type", invocation.ContentType)
req.Header.Set("Content-Encoding", invocation.ContentEncoding)

if c.credentials != nil {
req.SetBasicAuth(c.credentials.User, c.credentials.Password)
}
Expand Down
61 changes: 42 additions & 19 deletions pkg/openfaas/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
types2 "github.com/Templum/rabbitmq-connector/pkg/types"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -52,39 +53,50 @@ func TestClient_InvokeSync(t *testing.T) {
Password: "Invalid",
}, server.URL)

message := []byte("Test")
payload := types2.OpenFaaSInvocation{
Topic: "",
Message: &message,
ContentEncoding: "gzip",
ContentType: "text/plain",
}
nilPayload := types2.OpenFaaSInvocation{
Topic: "",
Message: nil,
ContentEncoding: "gzip",
ContentType: "text/plain",
}

t.Parallel()

t.Run("Should invoke the specified function", func(t *testing.T) {
payload := []byte("Test")
resp, err := openfaasClient.InvokeSync(context.Background(), "exists", payload)
resp, err := openfaasClient.InvokeSync(context.Background(), "exists", &payload)

assert.Nil(t, err, "Should not fail")
assert.Equal(t, string(resp), expectedResponse, "Did not receive expected response")
})

t.Run("Should except nil as body", func(t *testing.T) {
resp, err := openfaasClient.InvokeSync(context.Background(), "exists", nil)
resp, err := openfaasClient.InvokeSync(context.Background(), "exists", &nilPayload)

assert.Nil(t, err, "Should not fail")
assert.Equal(t, string(resp), expectedResponse, "Did not receive expected response")
})

t.Run("Should throw error if function does not exist", func(t *testing.T) {
payload := []byte("Test")
_, err := openfaasClient.InvokeSync(context.Background(), "nonexisting", payload)
_, err := openfaasClient.InvokeSync(context.Background(), "nonexisting", &payload)

assert.Error(t, err, "Function nonexisting is not deployed", "Did receive unexpected error")
})

t.Run("Should throw error if unauthorized", func(t *testing.T) {
_, err := authenticatedOpenFaaSClient.InvokeSync(context.Background(), "exists", nil)
_, err := authenticatedOpenFaaSClient.InvokeSync(context.Background(), "exists", &nilPayload)

assert.Error(t, err, "OpenFaaS Credentials are invalid", "Did receive unexpected error")
})

t.Run("Should throw error on unexpected status code", func(t *testing.T) {
payload := []byte("Test")
_, err := openfaasClient.InvokeSync(context.Background(), "internal", payload)
_, err := openfaasClient.InvokeSync(context.Background(), "internal", &payload)

assert.Error(t, err, "Received unexpected Status Code 500", "Did receive unexpected error")
})
Expand Down Expand Up @@ -128,39 +140,50 @@ func TestClient_InvokeAsync(t *testing.T) {
Password: "Invalid",
}, server.URL)

message := []byte("Test")
payload := types2.OpenFaaSInvocation{
Topic: "",
Message: &message,
ContentEncoding: "gzip",
ContentType: "text/plain",
}
nilPayload := types2.OpenFaaSInvocation{
Topic: "",
Message: nil,
ContentEncoding: "gzip",
ContentType: "text/plain",
}

t.Parallel()

t.Run("Should invoke the specified function", func(t *testing.T) {
payload := []byte("Test")
ok, err := openfaasClient.InvokeAsync(context.Background(), "exists", payload)
ok, err := openfaasClient.InvokeAsync(context.Background(), "exists", &payload)

assert.Nil(t, err, "Should not fail")
assert.Equal(t, ok, true, "Did not receive expected response")
})

t.Run("Should except nil as body", func(t *testing.T) {
ok, err := openfaasClient.InvokeAsync(context.Background(), "exists", nil)
ok, err := openfaasClient.InvokeAsync(context.Background(), "exists", &nilPayload)

assert.Nil(t, err, "Should not fail")
assert.Equal(t, ok, true, "Did not receive expected response")
})

t.Run("Should throw error if function does not exist", func(t *testing.T) {
payload := []byte("Test")
_, err := openfaasClient.InvokeAsync(context.Background(), "nonexisting", payload)
_, err := openfaasClient.InvokeAsync(context.Background(), "nonexisting", &payload)

assert.Error(t, err, "Function nonexisting is not deployed", "Did receive unexpected error")
})

t.Run("Should throw error if unauthorized", func(t *testing.T) {
_, err := authenticatedOpenFaaSClient.InvokeAsync(context.Background(), "exists", nil)
_, err := authenticatedOpenFaaSClient.InvokeAsync(context.Background(), "exists", &nilPayload)

assert.Error(t, err, "OpenFaaS Credentials are invalid", "Did receive unexpected error")
})

t.Run("Should throw error on unexpected status code", func(t *testing.T) {
payload := []byte("Test")
_, err := openfaasClient.InvokeAsync(context.Background(), "internal", payload)
_, err := openfaasClient.InvokeAsync(context.Background(), "internal", &payload)

assert.Error(t, err, "Received unexpected Status Code 500", "Did receive unexpected error")
})
Expand Down Expand Up @@ -245,7 +268,7 @@ func TestClient_HasNamespaceSupport(t *testing.T) {
func TestClient_GetFunctions(t *testing.T) {
server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
allFn := []types.FunctionStatus{
types.FunctionStatus{
{
Name: "function-name",
Image: "docker:image",
InvocationCount: 0,
Expand All @@ -256,7 +279,7 @@ func TestClient_GetFunctions(t *testing.T) {
Annotations: nil,
Namespace: "faas",
},
types.FunctionStatus{
{
Name: "wrencher",
Image: "docker:image",
InvocationCount: 0,
Expand All @@ -270,7 +293,7 @@ func TestClient_GetFunctions(t *testing.T) {
}

namespacedFn := []types.FunctionStatus{
types.FunctionStatus{
{
Name: "wrencher",
Image: "docker:image",
InvocationCount: 0,
Expand Down
2 changes: 1 addition & 1 deletion pkg/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *subscriber) Start() error {
for invocation := range invocations {
if s.topic == invocation.Topic {
go func() {
s.client.Invoke(s.topic, *invocation.Message)
s.client.Invoke(s.topic, invocation)
log.Printf("Finished invocations of functions on topic %s", s.topic)
}()
} else {
Expand Down
Loading

0 comments on commit 43976f8

Please sign in to comment.