diff --git a/.github/README.md b/.github/README.md index 76d3cec5..7b45ad07 100644 --- a/.github/README.md +++ b/.github/README.md @@ -1,8 +1,6 @@ # GoAws [![Build Status](https://travis-ci.org/p4tin/goaws.svg?branch=master)](https://travis-ci.org/p4tin/goaws) -You are always welcome to [tweet the creator in chief](https://twitter.com/gocodecloud) or [buy him a coffee](https://www.paypal.me/p4tin) - Written in Go this is a clone of the AWS SQS/SNS systems. This system is designed to emulate SQS and SNS in a local environment so developers can test their interfaces without having to connect to the AWS Cloud and possibly incurring the expense, or even worse actually write to production topics/queues by mistake. If you see any problems or would like to see a new feature, please open an issue here in github. As well, I will logon to Gitter so we can discuss your deployment issues or the weather. diff --git a/app/conf/mock-data/mock-config.yaml b/app/conf/mock-data/mock-config.yaml index 45ecd342..b8bf9ceb 100644 --- a/app/conf/mock-data/mock-config.yaml +++ b/app/conf/mock-data/mock-config.yaml @@ -59,10 +59,21 @@ BaseUnitTests: - Name: unit-queue2 RedrivePolicy: '{"maxReceiveCount": 100, "deadLetterTargetArn":"arn:aws:sqs:us-east-1:100010001000:other-queue1"}' - Name: other-queue1 - - Name: subscribed-queue2 + - Name: subscribed-queue1 + - Name: subscribed-queue3 Topics: - Name: unit-topic1 Subscriptions: - - QueueName: subscribed-queue2 + - QueueName: subscribed-queue1 Raw: true - Name: unit-topic2 + - Name: unit-topic3 + Subscriptions: + - QueueName: subscribed-queue3 + Raw: false + - Name: unit-topic-http + Subscriptions: + - Protocol: http + EndPoint: http://over.ride.me/for/tests + TopicArn: arn:aws:sqs:region:accountID:unit-topic-http + Raw: true diff --git a/app/gosns/gosns.go b/app/gosns/gosns.go index 8f3ef1b5..16b0f038 100644 --- a/app/gosns/gosns.go +++ b/app/gosns/gosns.go @@ -329,137 +329,8 @@ func DeleteTopic(w http.ResponseWriter, req *http.Request) { } -// aws --endpoint-url http://localhost:47194 sns publish --topic-arn arn:aws:sns:yopa-local:000000000000:test1 --message "This is a test" -func Publish(w http.ResponseWriter, req *http.Request) { - content := req.FormValue("ContentType") - topicArn := req.FormValue("TopicArn") - subject := req.FormValue("Subject") - messageBody := req.FormValue("Message") - messageStructure := req.FormValue("MessageStructure") - messageAttributes := getMessageAttributesFromRequest(req) - - arnSegments := strings.Split(topicArn, ":") - topicName := arnSegments[len(arnSegments)-1] - - _, ok := app.SyncTopics.Topics[topicName] - if ok { - log.WithFields(log.Fields{ - "topic": topicName, - "topicArn": topicArn, - "subject": subject, - }).Debug("Publish to Topic") - for _, subs := range app.SyncTopics.Topics[topicName].Subscriptions { - switch app.Protocol(subs.Protocol) { - case app.ProtocolSQS: - publishSQS(w, req, subs, messageBody, messageAttributes, subject, topicArn, topicName, messageStructure) - case app.ProtocolHTTP: - fallthrough - case app.ProtocolHTTPS: - publishHTTP(subs, messageBody, messageAttributes, subject, topicArn) - } - } - } else { - createErrorResponse(w, req, "TopicNotFound") - return - } - - //Create the response - msgId, _ := common.NewUUID() - uuid, _ := common.NewUUID() - respStruct := app.PublishResponse{Xmlns: "http://queue.amazonaws.com/doc/2012-11-05/", Result: app.PublishResult{MessageId: msgId}, Metadata: app.ResponseMetadata{RequestId: uuid}} - SendResponseBack(w, req, respStruct, content) -} - -func publishSQS(w http.ResponseWriter, req *http.Request, - subs *app.Subscription, messageBody string, messageAttributes map[string]app.MessageAttributeValue, - subject string, topicArn string, topicName string, messageStructure string) { - if subs.FilterPolicy != nil && !subs.FilterPolicy.IsSatisfiedBy(messageAttributes) { - return - } - - endPoint := subs.EndPoint - uriSegments := strings.Split(endPoint, "/") - queueName := uriSegments[len(uriSegments)-1] - arnSegments := strings.Split(queueName, ":") - queueName = arnSegments[len(arnSegments)-1] - - if _, ok := app.SyncQueues.Queues[queueName]; ok { - msg := app.Message{} - - if subs.Raw == false { - m, err := CreateMessageBody(subs, messageBody, subject, messageStructure, messageAttributes) - if err != nil { - createErrorResponse(w, req, err.Error()) - return - } - - msg.MessageBody = m - } else { - msg.MessageAttributes = messageAttributes - msg.MD5OfMessageAttributes = common.HashAttributes(messageAttributes) - m, err := extractMessageFromJSON(messageBody, subs.Protocol) - if err == nil { - msg.MessageBody = []byte(m) - } else { - msg.MessageBody = []byte(messageBody) - } - } - - msg.MD5OfMessageBody = common.GetMD5Hash(messageBody) - msg.Uuid, _ = common.NewUUID() - app.SyncQueues.Lock() - app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg) - app.SyncQueues.Unlock() - - log.Infof("%s: Topic: %s(%s), Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), topicName, queueName, msg.MessageBody) - } else { - log.Infof("%s: Queue %s does not exist, message discarded\n", time.Now().Format("2006-01-02 15:04:05"), queueName) - } -} - -func publishHTTP(subs *app.Subscription, messageBody string, messageAttributes map[string]app.MessageAttributeValue, - subject string, topicArn string) { - id, _ := common.NewUUID() - msg := app.SNSMessage{ - Type: "Notification", - MessageId: id, - TopicArn: topicArn, - Subject: subject, - Message: messageBody, - Timestamp: time.Now().UTC().Format(time.RFC3339), - SignatureVersion: "1", - SigningCertURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/SimpleNotificationService/" + id + ".pem", - UnsubscribeURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/?Action=Unsubscribe&SubscriptionArn=" + subs.SubscriptionArn, - MessageAttributes: formatAttributes(messageAttributes), - } - - signature, err := signMessage(PrivateKEY, &msg) - if err != nil { - log.Error(err) - } else { - msg.Signature = signature - } - err = callEndpoint(subs.EndPoint, subs.SubscriptionArn, msg, subs.Raw) - if err != nil { - log.WithFields(log.Fields{ - "EndPoint": subs.EndPoint, - "ARN": subs.SubscriptionArn, - "error": err.Error(), - }).Error("Error calling endpoint") - } -} - -func formatAttributes(values map[string]app.MessageAttributeValue) map[string]app.MsgAttr { - attr := make(map[string]app.MsgAttr) - for k, v := range values { - attr[k] = app.MsgAttr{ - Type: v.DataType, - Value: v.Value, - } - } - return attr -} - +// NOTE: The use case for this is to use GoAWS to call some external system with the message payload. Essentially +// it is a localized subscription to some non-AWS endpoint. func callEndpoint(endpoint string, subArn string, msg app.SNSMessage, raw bool) error { log.WithFields(log.Fields{ "sns": msg, @@ -524,75 +395,6 @@ func callEndpoint(endpoint string, subArn string, msg app.SNSMessage, raw bool) return nil } -func getMessageAttributesFromRequest(req *http.Request) map[string]app.MessageAttributeValue { - attributes := make(map[string]app.MessageAttributeValue) - - for i := 1; true; i++ { - name := req.FormValue(fmt.Sprintf("MessageAttributes.entry.%d.Name", i)) - if name == "" { - break - } - - dataType := req.FormValue(fmt.Sprintf("MessageAttributes.entry.%d.Value.DataType", i)) - if dataType == "" { - log.Warnf("DataType of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name) - continue - } - - // StringListValue and BinaryListValue is currently not implemented - for _, valueKey := range [...]string{"StringValue", "BinaryValue"} { - value := req.FormValue(fmt.Sprintf("MessageAttributes.entry.%d.Value.%s", i, valueKey)) - if value != "" { - attributes[name] = app.MessageAttributeValue{Name: name, DataType: dataType, Value: value, ValueKey: valueKey} - } - } - - if _, ok := attributes[name]; !ok { - log.Warnf("StringValue or BinaryValue of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name) - } - } - - return attributes -} - -func CreateMessageBody(subs *app.Subscription, msg string, subject string, messageStructure string, - messageAttributes map[string]app.MessageAttributeValue) ([]byte, error) { - - msgId, _ := common.NewUUID() - - message := app.SNSMessage{ - Type: "Notification", - MessageId: msgId, - TopicArn: subs.TopicArn, - Subject: subject, - Timestamp: time.Now().UTC().Format(time.RFC3339), - SignatureVersion: "1", - SigningCertURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/SimpleNotificationService/" + msgId + ".pem", - UnsubscribeURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/?Action=Unsubscribe&SubscriptionArn=" + subs.SubscriptionArn, - MessageAttributes: formatAttributes(messageAttributes), - } - - if app.MessageStructure(messageStructure) == app.MessageStructureJSON { - m, err := extractMessageFromJSON(msg, subs.Protocol) - if err != nil { - return nil, err - } - message.Message = m - } else { - message.Message = msg - } - - signature, err := signMessage(PrivateKEY, &message) - if err != nil { - log.Error(err) - } else { - message.Signature = signature - } - - byteMsg, _ := json.Marshal(message) - return byteMsg, nil -} - func extractMessageFromJSON(msg string, protocol string) (string, error) { var msgWithProtocols map[string]string if err := json.Unmarshal([]byte(msg), &msgWithProtocols); err != nil { diff --git a/app/gosns/gosns_create_message_test.go b/app/gosns/gosns_create_message_test.go index f2080522..a859d5d0 100644 --- a/app/gosns/gosns_create_message_test.go +++ b/app/gosns/gosns_create_message_test.go @@ -15,6 +15,8 @@ const ( messageAttributesKey = "MessageAttributes" ) +// TODO - Admiral-Piett - merge these with `publish_test.go` + // When simple message string is passed, // it must be used for all subscribers (no matter the protocol) func TestCreateMessageBody_NonJson(t *testing.T) { @@ -27,7 +29,7 @@ func TestCreateMessageBody_NonJson(t *testing.T) { Raw: false, } - snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureEmpty, make(map[string]app.MessageAttributeValue)) + snsMessage, err := createMessageBody(subs, message, subject, messageStructureEmpty, make(map[string]app.MessageAttributeValue)) if err != nil { t.Fatalf(`error creating SNS message: %s`, err) } @@ -69,7 +71,7 @@ func TestCreateMessageBody_OnlyDefaultValueInJson(t *testing.T) { message := `{"default": "default message text", "http": "HTTP message text"}` subject := "subject" - snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureJSON, nil) + snsMessage, err := createMessageBody(subs, message, subject, messageStructureJSON, nil) if err != nil { t.Fatalf(`error creating SNS message: %s`, err) } @@ -112,7 +114,7 @@ func TestCreateMessageBody_OnlySqsValueInJson(t *testing.T) { message := `{"sqs": "message text"}` subject := "subject" - snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureJSON, nil) + snsMessage, err := createMessageBody(subs, message, subject, messageStructureJSON, nil) if err == nil { t.Fatalf(`error expected but instead SNS message was returned: %s`, snsMessage) } @@ -130,7 +132,7 @@ func TestCreateMessageBody_BothDefaultAndSqsValuesInJson(t *testing.T) { message := `{"default": "default message text", "sqs": "sqs message text"}` subject := "subject" - snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureJSON, nil) + snsMessage, err := createMessageBody(subs, message, subject, messageStructureJSON, nil) if err != nil { t.Fatalf(`error creating SNS message: %s`, err) } @@ -173,7 +175,7 @@ func TestCreateMessageBody_NonJsonContainingJson(t *testing.T) { message := `{"default": "default message text", "sqs": "sqs message text"}` subject := "subject" - snsMessage, err := CreateMessageBody(subs, message, subject, "", nil) + snsMessage, err := createMessageBody(subs, message, subject, "", nil) if err != nil { t.Fatalf(`error creating SNS message: %s`, err) } @@ -219,7 +221,7 @@ func TestCreateMessageBody_WithMessageAttributes(t *testing.T) { attributes := map[string]app.MessageAttributeValue{ stringMessageAttributeValue.DataType: stringMessageAttributeValue, } - snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureEmpty, attributes) + snsMessage, err := createMessageBody(subs, message, subject, messageStructureEmpty, attributes) if err != nil { t.Fatalf(`error creating SNS message: %s`, err) } diff --git a/app/gosns/gosns_test.go b/app/gosns/gosns_test.go index 2e64c2e2..a5ddce98 100644 --- a/app/gosns/gosns_test.go +++ b/app/gosns/gosns_test.go @@ -45,241 +45,6 @@ func TestListTopicshandler_POST_NoTopics(t *testing.T) { } } -func TestPublishhandler_POST_SendMessage(t *testing.T) { - defer func() { - test.ResetApp() - }() - - // Create a request to pass to our handler. We don't have any query parameters for now, so we'll - // pass 'nil' as the third parameter. - req, err := http.NewRequest("POST", "/", nil) - if err != nil { - t.Fatal(err) - } - - form := url.Values{} - form.Add("TopicArn", "arn:aws:sns:local:000000000000:UnitTestTopic1") - form.Add("Message", "TestMessage1") - req.PostForm = form - - // Prepare existant topic - topic := &app.Topic{ - Name: "UnitTestTopic1", - Arn: "arn:aws:sns:local:000000000000:UnitTestTopic1", - } - app.SyncTopics.Topics["UnitTestTopic1"] = topic - - // We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response. - rr := httptest.NewRecorder() - handler := http.HandlerFunc(Publish) - - // Our handlers satisfy http.Handler, so we can call their ServeHTTP method - // directly and pass in our Request and ResponseRecorder. - handler.ServeHTTP(rr, req) - - // Check the status code is what we expect. - if status := rr.Code; status != http.StatusOK { - t.Errorf("handler returned wrong status code: got %v want %v", - status, http.StatusOK) - } - - // Check the response body is what we expect. - expected := "" - if !strings.Contains(rr.Body.String(), expected) { - t.Errorf("handler returned unexpected body: got %v want %v", - rr.Body.String(), expected) - } -} - -func TestPublishHandler_POST_FilterPolicyRejectsTheMessage(t *testing.T) { - // Create a request to pass to our handler. We don't have any query parameters for now, so we'll - // pass 'nil' as the third parameter. - req, err := http.NewRequest("POST", "/", nil) - if err != nil { - t.Fatal(err) - } - - // We set up queue so later we can check if anything was posted there - queueName := "testingQueue" - queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/queue/" + queueName - queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Region + ":000000000000:" + queueName - app.SyncQueues.Queues[queueName] = &app.Queue{ - Name: queueName, - VisibilityTimeout: 30, - Arn: queueArn, - URL: queueUrl, - IsFIFO: app.HasFIFOQueueName(queueName), - } - - // We set up a topic with the corresponding Subscription including FilterPolicy - topicName := "testingTopic" - topicArn := "arn:aws:sns:" + app.CurrentEnvironment.Region + ":000000000000:" + topicName - subArn, _ := common.NewUUID() - subArn = topicArn + ":" + subArn - app.SyncTopics.Topics[topicName] = &app.Topic{Name: topicName, Arn: topicArn, Subscriptions: []*app.Subscription{ - { - EndPoint: app.SyncQueues.Queues[queueName].Arn, - Protocol: "sqs", - SubscriptionArn: subArn, - FilterPolicy: &app.FilterPolicy{ - "foo": {"bar"}, // set up FilterPolicy for attribute `foo` to be equal `bar` - }, - }, - }} - - form := url.Values{} - form.Add("TopicArn", topicArn) - form.Add("Message", "TestMessage1") - form.Add("MessageAttributes.entry.1.Name", "foo") // special format of parameter for MessageAttribute - form.Add("MessageAttributes.entry.1.Value.StringValue", "baz") // we actually sent attribute `foo` to be equal `baz` - req.PostForm = form - - // We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response. - rr := httptest.NewRecorder() - handler := http.HandlerFunc(Publish) - - // Our handlers satisfy http.Handler, so we can call their ServeHTTP method - // directly and pass in our Request and ResponseRecorder. - handler.ServeHTTP(rr, req) - - // Check the status code is what we expect. - if status := rr.Code; status != http.StatusOK { - t.Errorf("handler returned wrong status code: got %v want %v", - status, http.StatusOK) - } - - // Check the response body is what we expect. - expected := "" - if !strings.Contains(rr.Body.String(), expected) { - t.Errorf("handler returned unexpected body: got %v want %v", - rr.Body.String(), expected) - } - - // check of the queue is empty - if len(app.SyncQueues.Queues[queueName].Messages) != 0 { - t.Errorf("queue contains unexpected messages: got %v want %v", - len(app.SyncQueues.Queues[queueName].Messages), 0) - } -} - -func TestPublishHandler_POST_FilterPolicyPassesTheMessage(t *testing.T) { - // Create a request to pass to our handler. We don't have any query parameters for now, so we'll - // pass 'nil' as the third parameter. - req, err := http.NewRequest("POST", "/", nil) - if err != nil { - t.Fatal(err) - } - - // We set up queue so later we can check if anything was posted there - queueName := "testingQueue" - queueUrl := "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/queue/" + queueName - queueArn := "arn:aws:sqs:" + app.CurrentEnvironment.Region + ":000000000000:" + queueName - app.SyncQueues.Queues[queueName] = &app.Queue{ - Name: queueName, - VisibilityTimeout: 30, - Arn: queueArn, - URL: queueUrl, - IsFIFO: app.HasFIFOQueueName(queueName), - } - - // We set up a topic with the corresponding Subscription including FilterPolicy - topicName := "testingTopic" - topicArn := "arn:aws:sns:" + app.CurrentEnvironment.Region + ":000000000000:" + topicName - subArn, _ := common.NewUUID() - subArn = topicArn + ":" + subArn - app.SyncTopics.Topics[topicName] = &app.Topic{Name: topicName, Arn: topicArn, Subscriptions: []*app.Subscription{ - { - EndPoint: app.SyncQueues.Queues[queueName].Arn, - Protocol: "sqs", - SubscriptionArn: subArn, - FilterPolicy: &app.FilterPolicy{ - "foo": {"bar"}, // set up FilterPolicy for attribute `foo` to be equal `bar` - }, - }, - }} - - form := url.Values{} - form.Add("TopicArn", topicArn) - form.Add("Message", "TestMessage1") - form.Add("MessageAttributes.entry.1.Name", "foo") // special format of parameter for MessageAttribute - form.Add("MessageAttributes.entry.1.Value.DataType", "String") // Datatype must be specified for proper parsing by aws - form.Add("MessageAttributes.entry.1.Value.StringValue", "bar") // we actually sent attribute `foo` to be equal `baz` - req.PostForm = form - - // We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response. - rr := httptest.NewRecorder() - handler := http.HandlerFunc(Publish) - - // Our handlers satisfy http.Handler, so we can call their ServeHTTP method - // directly and pass in our Request and ResponseRecorder. - handler.ServeHTTP(rr, req) - - // Check the status code is what we expect. - if status := rr.Code; status != http.StatusOK { - t.Errorf("handler returned wrong status code: got %v want %v", - status, http.StatusOK) - } - - // Check the response body is what we expect. - expected := "" - if !strings.Contains(rr.Body.String(), expected) { - t.Errorf("handler returned unexpected body: got %v want %v", - rr.Body.String(), expected) - } - - // check of the queue is empty - if len(app.SyncQueues.Queues[queueName].Messages) != 1 { - t.Errorf("queue contains unexpected messages: got %v want %v", - len(app.SyncQueues.Queues[queueName].Messages), 1) - } -} - -func TestPublish_No_Queue_Error_handler_POST_Success(t *testing.T) { - defer func() { - test.ResetApp() - }() - - // Create a request to pass to our handler. We don't have any query parameters for now, so we'll - // pass 'nil' as the third parameter. - req, err := http.NewRequest("POST", "/", nil) - if err != nil { - t.Fatal(err) - } - - form := url.Values{} - form.Add("TopicArn", "arn:aws:sns:local:000000000000:UnitTestTopic1") - form.Add("Message", "TestMessage1") - req.PostForm = form - - // Prepare existant topic - topic := &app.Topic{ - Name: "UnitTestTopic1", - Arn: "arn:aws:sns:local:000000000000:UnitTestTopic1", - } - app.SyncTopics.Topics["UnitTestTopic1"] = topic - - // We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response. - rr := httptest.NewRecorder() - handler := http.HandlerFunc(Publish) - - // Our handlers satisfy http.Handler, so we can call their ServeHTTP method - // directly and pass in our Request and ResponseRecorder. - handler.ServeHTTP(rr, req) - - // Check the status code is what we expect. - if status := rr.Code; status != http.StatusOK { - t.Errorf("handler returned wrong status code: got %v want %v", - status, http.StatusOK) - } - - // Check the response body is what we expect. - expected := "" - if !strings.Contains(rr.Body.String(), expected) { - t.Errorf("handler returned unexpected body: got %v want %v", - rr.Body.String(), expected) - } -} - // TODO - add a subscription and I think this should work func TestListSubscriptionByTopicResponse_No_Owner(t *testing.T) { conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "Local") diff --git a/app/gosns/publish.go b/app/gosns/publish.go new file mode 100644 index 00000000..34c7b0ae --- /dev/null +++ b/app/gosns/publish.go @@ -0,0 +1,201 @@ +package gosns + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/google/uuid" + + "github.com/Admiral-Piett/goaws/app/interfaces" + "github.com/Admiral-Piett/goaws/app/models" + "github.com/Admiral-Piett/goaws/app/utils" + + "github.com/Admiral-Piett/goaws/app" + "github.com/Admiral-Piett/goaws/app/common" + log "github.com/sirupsen/logrus" +) + +// TODO - Admiral-Piett - Pick a MessageAttribute style and get rid of `utils.ConvertToOldMessageAttributeValueStructure` + +// aws --endpoint-url http://localhost:47194 sns publish --topic-arn arn:aws:sns:yopa-local:000000000000:test1 --message "This is a test" +func PublishV1(req *http.Request) (int, interfaces.AbstractResponseBody) { + requestBody := models.NewPublishRequest() + ok := utils.REQUEST_TRANSFORMER(requestBody, req, false) + if !ok { + log.Error("Invalid Request - PublishV1") + return utils.CreateErrorResponseV1("InvalidParameterValue", false) + } + + // TODO - support TargetArn + if requestBody.TopicArn == "" || requestBody.Message == "" { + return utils.CreateErrorResponseV1("InvalidParameterValue", false) + } + + arnSegments := strings.Split(requestBody.TopicArn, ":") + topicName := arnSegments[len(arnSegments)-1] + + _, ok = app.SyncTopics.Topics[topicName] + if ok { + log.WithFields(log.Fields{ + "topic": topicName, + "topicArn": requestBody.TopicArn, + "subject": requestBody.Subject, + }).Debug("Publish to Topic") + for _, subscription := range app.SyncTopics.Topics[topicName].Subscriptions { + switch app.Protocol(subscription.Protocol) { + case app.ProtocolSQS: + err := publishSQS(subscription, topicName, requestBody) + if err != nil { + utils.CreateErrorResponseV1(err.Error(), false) + } + case app.ProtocolHTTP: + fallthrough + case app.ProtocolHTTPS: + publishHTTP(subscription, requestBody) + } + } + } else { + return utils.CreateErrorResponseV1("TopicNotFound", false) + } + + //Create the response + respStruct := models.PublishResponse{ + Xmlns: models.BASE_XMLNS, + Result: models.PublishResult{ + MessageId: uuid.NewString(), + }, + Metadata: app.ResponseMetadata{ + RequestId: uuid.NewString(), + }, + } + return http.StatusOK, respStruct +} + +func publishSQS(subscription *app.Subscription, topicName string, requestBody *models.PublishRequest) error { + messageAttributes := utils.ConvertToOldMessageAttributeValueStructure(requestBody.MessageAttributes) + if subscription.FilterPolicy != nil && !subscription.FilterPolicy.IsSatisfiedBy(messageAttributes) { + return nil + } + + endPoint := subscription.EndPoint + uriSegments := strings.Split(endPoint, "/") + queueName := uriSegments[len(uriSegments)-1] + arnSegments := strings.Split(queueName, ":") + queueName = arnSegments[len(arnSegments)-1] + + if _, ok := app.SyncQueues.Queues[queueName]; ok { + msg := app.Message{} + + if subscription.Raw == false { + m, err := createMessageBody(subscription, requestBody.Message, requestBody.Subject, requestBody.MessageStructure, messageAttributes) + if err != nil { + return err + } + + msg.MessageBody = m + } else { + msg.MessageAttributes = messageAttributes + msg.MD5OfMessageAttributes = common.HashAttributes(messageAttributes) + m, err := extractMessageFromJSON(requestBody.Message, subscription.Protocol) + if err == nil { + msg.MessageBody = []byte(m) + } else { + msg.MessageBody = []byte(requestBody.Message) + } + } + + msg.MD5OfMessageBody = common.GetMD5Hash(requestBody.Message) + msg.Uuid, _ = common.NewUUID() + app.SyncQueues.Lock() + app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg) + app.SyncQueues.Unlock() + + log.Infof("%s: Topic: %s(%s), Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), topicName, queueName, msg.MessageBody) + } else { + log.Infof("%s: Queue %s does not exist, message discarded\n", time.Now().Format("2006-01-02 15:04:05"), queueName) + } + return nil +} + +func publishHTTP(subs *app.Subscription, requestBody *models.PublishRequest) { + messageAttributes := utils.ConvertToOldMessageAttributeValueStructure(requestBody.MessageAttributes) + id := uuid.NewString() + msg := app.SNSMessage{ + Type: "Notification", + MessageId: id, + TopicArn: requestBody.TopicArn, + Subject: requestBody.Subject, + Message: requestBody.Message, + Timestamp: time.Now().UTC().Format(time.RFC3339), + SignatureVersion: "1", + SigningCertURL: fmt.Sprintf("http://%s:%s/SimpleNotificationService/%s.pem", app.CurrentEnvironment.Host, app.CurrentEnvironment.Port, id), + UnsubscribeURL: fmt.Sprintf("http://%s:%s/?Action=Unsubscribe&SubscriptionArn=%s", app.CurrentEnvironment.Host, app.CurrentEnvironment.Port, subs.SubscriptionArn), + MessageAttributes: formatAttributes(messageAttributes), + } + + signature, err := signMessage(PrivateKEY, &msg) + if err != nil { + log.Error(err) + } else { + msg.Signature = signature + } + err = callEndpoint(subs.EndPoint, subs.SubscriptionArn, msg, subs.Raw) + if err != nil { + log.WithFields(log.Fields{ + "EndPoint": subs.EndPoint, + "ARN": subs.SubscriptionArn, + "error": err.Error(), + }).Error("Error calling endpoint") + } +} + +func createMessageBody(subs *app.Subscription, msg string, subject string, messageStructure string, + messageAttributes map[string]app.MessageAttributeValue) ([]byte, error) { + + msgId := uuid.NewString() + message := app.SNSMessage{ + Type: "Notification", + MessageId: msgId, + TopicArn: subs.TopicArn, + Subject: subject, + Timestamp: time.Now().UTC().Format(time.RFC3339), + SignatureVersion: "1", + SigningCertURL: fmt.Sprintf("http://%s:%s/SimpleNotificationService/%s.pem", app.CurrentEnvironment.Host, app.CurrentEnvironment.Port, msgId), + UnsubscribeURL: fmt.Sprintf("http://%s:%s/?Action=Unsubscribe&SubscriptionArn=%s", app.CurrentEnvironment.Host, app.CurrentEnvironment.Port, subs.SubscriptionArn), + MessageAttributes: formatAttributes(messageAttributes), + } + + if app.MessageStructure(messageStructure) == app.MessageStructureJSON { + m, err := extractMessageFromJSON(msg, subs.Protocol) + if err != nil { + return nil, err + } + message.Message = m + } else { + message.Message = msg + } + + signature, err := signMessage(PrivateKEY, &message) + if err != nil { + log.Error(err) + } else { + message.Signature = signature + } + + byteMsg, _ := json.Marshal(message) + return byteMsg, nil +} + +func formatAttributes(values map[string]app.MessageAttributeValue) map[string]app.MsgAttr { + attr := make(map[string]app.MsgAttr) + for k, v := range values { + attr[k] = app.MsgAttr{ + Type: v.DataType, + Value: v.Value, + } + } + return attr +} diff --git a/app/gosns/publish_test.go b/app/gosns/publish_test.go new file mode 100644 index 00000000..bf93bb86 --- /dev/null +++ b/app/gosns/publish_test.go @@ -0,0 +1,499 @@ +package gosns + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/Admiral-Piett/goaws/app/fixtures" + + "github.com/Admiral-Piett/goaws/app" + "github.com/Admiral-Piett/goaws/app/conf" + "github.com/Admiral-Piett/goaws/app/interfaces" + "github.com/Admiral-Piett/goaws/app/models" + "github.com/Admiral-Piett/goaws/app/test" + "github.com/Admiral-Piett/goaws/app/utils" + "github.com/stretchr/testify/assert" +) + +func TestPublishV1_success_sqs(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + + message := "{\"IAm\": \"aMessage\"}" + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + v := resultingStruct.(*models.PublishRequest) + *v = models.PublishRequest{ + TopicArn: topicArn, + Message: message, + } + return true + } + + _, r := test.GenerateRequestInfo("POST", "/", nil, true) + status, response := PublishV1(r) + + assert.Equal(t, http.StatusOK, status) + _, ok := response.(models.PublishResponse) + assert.True(t, ok) + + messages := app.SyncQueues.Queues["subscribed-queue1"].Messages + assert.Len(t, messages, 1) + assert.Equal(t, message, string(messages[0].MessageBody)) +} + +func TestPublishV1_success_http(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + topicArn := app.SyncTopics.Topics["unit-topic-http"].Arn + + message := "{\"IAm\": \"aMessage\"}" + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + v := resultingStruct.(*models.PublishRequest) + *v = models.PublishRequest{ + TopicArn: topicArn, + Message: message, + } + return true + } + + _, r := test.GenerateRequestInfo("POST", "/", nil, true) + status, response := PublishV1(r) + + assert.Equal(t, http.StatusOK, status) + _, ok := response.(models.PublishResponse) + assert.True(t, ok) +} + +func TestPublishV1_success_https(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + topicArn := app.SyncTopics.Topics["unit-topic-http"].Arn + + app.SyncTopics.Lock() + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].Protocol = "https" + app.SyncTopics.Unlock() + + message := "{\"IAm\": \"aMessage\"}" + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + v := resultingStruct.(*models.PublishRequest) + *v = models.PublishRequest{ + TopicArn: topicArn, + Message: message, + } + return true + } + + _, r := test.GenerateRequestInfo("POST", "/", nil, true) + status, response := PublishV1(r) + + assert.Equal(t, http.StatusOK, status) + _, ok := response.(models.PublishResponse) + assert.True(t, ok) +} + +func TestPublishV1_success_with_optional_fields(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + + message := "{\"IAm\": \"aMessage\"}" + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + v := resultingStruct.(*models.PublishRequest) + *v = models.PublishRequest{ + TopicArn: topicArn, + Message: message, + MessageAttributes: map[string]models.MessageAttributeValue{ + "test": models.MessageAttributeValue{ + DataType: "string", + StringValue: "value", + }, + }, + MessageDeduplicationId: "dedupe-id", + MessageGroupId: "group-id", + MessageStructure: "json", + PhoneNumber: "phone-number", + Subject: "subject", + TargetArn: "target-arn", + } + return true + } + + _, r := test.GenerateRequestInfo("POST", "/", nil, true) + status, response := PublishV1(r) + + assert.Equal(t, http.StatusOK, status) + _, ok := response.(models.PublishResponse) + assert.True(t, ok) + + messages := app.SyncQueues.Queues["subscribed-queue1"].Messages + assert.Len(t, messages, 1) + assert.Equal(t, message, string(messages[0].MessageBody)) +} + +func TestPublishV1_request_transformer_error(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + return false + } + + _, r := test.GenerateRequestInfo("POST", "/", nil, true) + status, _ := PublishV1(r) + + assert.Equal(t, http.StatusBadRequest, status) +} + +func TestPublishV1_request_missing_topic_arn(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + message := "{\"IAm\": \"aMessage\"}" + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + v := resultingStruct.(*models.PublishRequest) + *v = models.PublishRequest{ + Message: message, + } + return true + } + + _, r := test.GenerateRequestInfo("POST", "/", nil, true) + status, _ := PublishV1(r) + + assert.Equal(t, http.StatusBadRequest, status) +} + +func TestPublishV1_request_missing_message(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + v := resultingStruct.(*models.PublishRequest) + *v = models.PublishRequest{ + TopicArn: topicArn, + } + return true + } + + _, r := test.GenerateRequestInfo("POST", "/", nil, true) + status, _ := PublishV1(r) + + assert.Equal(t, http.StatusBadRequest, status) +} + +func TestPublishV1_request_invalid_topic(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + utils.REQUEST_TRANSFORMER = utils.TransformRequest + }() + + message := "{\"IAm\": \"aMessage\"}" + utils.REQUEST_TRANSFORMER = func(resultingStruct interfaces.AbstractRequestBody, req *http.Request, emptyRequestValid bool) (success bool) { + v := resultingStruct.(*models.PublishRequest) + *v = models.PublishRequest{ + TopicArn: "garbage", + Message: message, + } + return true + } + + _, r := test.GenerateRequestInfo("POST", "/", nil, true) + status, _ := PublishV1(r) + + assert.Equal(t, http.StatusBadRequest, status) +} + +func Test_publishSQS_success_raw(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + }() + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + + message := "{\"IAm\": \"aMessage\"}" + sub := app.SyncTopics.Topics["unit-topic1"].Subscriptions[0] + request := models.PublishRequest{ + TopicArn: topicArn, + Message: message, + } + err := publishSQS(sub, "unit-topic1", &request) + + assert.Nil(t, err) + + messages := app.SyncQueues.Queues["subscribed-queue1"].Messages + assert.Len(t, messages, 1) + assert.Equal(t, message, string(messages[0].MessageBody)) +} + +// Most other scenarios should be tested in the functions above, if reasonably possible +func Test_publishSQS_success_json(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + }() + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + + message := "{\"IAm\": \"aMessage\"}" + + app.SyncTopics.Lock() + sub := app.SyncTopics.Topics["unit-topic1"].Subscriptions[0] + sub.Raw = false + app.SyncTopics.Unlock() + request := models.PublishRequest{ + TopicArn: topicArn, + Message: message, + } + err := publishSQS(sub, "unit-topic1", &request) + + assert.Nil(t, err) + + messages := app.SyncQueues.Queues["subscribed-queue1"].Messages + assert.Len(t, messages, 1) + + body := string(messages[0].MessageBody) + assert.Contains(t, body, "\"Message\":\"{\\\"IAm\\\": \\\"aMessage\\\"}\"") + assert.Contains(t, body, "Type") + assert.Contains(t, body, "MessageId") + assert.Contains(t, body, "TopicArn") + assert.Contains(t, body, "Signature") + assert.Contains(t, body, "SigningCertURL") + assert.Contains(t, body, "UnsubscribeURL") + assert.Contains(t, body, "SubscribeURL") + assert.Contains(t, body, "MessageAttributes") +} + +func Test_publishSQS_filter_policy_not_satisfied_by_attributes(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + }() + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + message := "{\"IAm\": \"aMessage\"}" + + app.SyncTopics.Lock() + sub := app.SyncTopics.Topics["unit-topic1"].Subscriptions[0] + sub.FilterPolicy = &app.FilterPolicy{"foo": []string{"bar"}} + app.SyncTopics.Unlock() + + request := models.PublishRequest{ + TopicArn: topicArn, + Message: message, + MessageAttributes: map[string]models.MessageAttributeValue{ + "invalid": models.MessageAttributeValue{ + DataType: "string", + StringValue: "garbage", + }, + }, + } + err := publishSQS(sub, "unit-topic1", &request) + + assert.Nil(t, err) +} + +func Test_publishSQS_missing_queue_returns_nil(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + }() + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + + message := "{\"IAm\": \"aMessage\"}" + + app.SyncTopics.Lock() + sub := app.SyncTopics.Topics["unit-topic1"].Subscriptions[0] + sub.EndPoint = "garbage" + app.SyncTopics.Unlock() + + request := models.PublishRequest{ + TopicArn: topicArn, + Message: message, + } + err := publishSQS(sub, "unit-topic1", &request) + + assert.Nil(t, err) +} + +// Most other scenarios should be tested in the functions above, if reasonably possible +func Test_publishHTTP_success(t *testing.T) { + called := false + subscribedServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(200) + })) + + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + subscribedServer.Close() + }() + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + message := "{\"IAm\": \"aMessage\"}" + + app.SyncTopics.Lock() + sub := app.SyncTopics.Topics["unit-topic1"].Subscriptions[0] + sub.EndPoint = subscribedServer.URL + app.SyncTopics.Unlock() + + request := models.PublishRequest{ + TopicArn: topicArn, + Message: message, + } + + publishHTTP(sub, &request) + + assert.True(t, called) +} + +func Test_publishHTTP_callEndpoint_failure(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + }() + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + message := "{\"IAm\": \"aMessage\"}" + + app.SyncTopics.Lock() + sub := app.SyncTopics.Topics["unit-topic1"].Subscriptions[0] + app.SyncTopics.Unlock() + + request := models.PublishRequest{ + TopicArn: topicArn, + Message: message, + } + + publishHTTP(sub, &request) + // swallows all errors +} + +func Test_createMessageBody_success_json(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + }() + + message := "{\"default\": \"message\"}" + subject := "I'm the subject" + attrs := map[string]app.MessageAttributeValue{ + "test": app.MessageAttributeValue{ + Name: "MyAttr", + DataType: "string", + Value: "value", + }, + } + + sub := app.SyncTopics.Topics["unit-topic1"].Subscriptions[0] + + result, err := createMessageBody(sub, message, subject, "json", attrs) + + assert.Nil(t, err) + + msg := &app.SNSMessage{} + json.Unmarshal(result, msg) + + assert.Equal(t, "Notification", msg.Type) + assert.Equal(t, "", msg.Token) + assert.Equal(t, fmt.Sprintf("%s:unit-topic1", fixtures.BASE_SNS_ARN), msg.TopicArn) + assert.Equal(t, "I'm the subject", msg.Subject) + assert.Equal(t, "message", msg.Message) + assert.Equal(t, "1", msg.SignatureVersion) + assert.Contains(t, msg.SigningCertURL, "http://host:port/SimpleNotificationService/") + assert.Contains(t, msg.UnsubscribeURL, "http://host:port/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:region:accountID:unit-topic1:") + assert.Equal(t, msg.MessageAttributes, map[string]app.MsgAttr{"test": app.MsgAttr{Type: "string", Value: "value"}}) +} + +func Test_createMessageBody_success_raw(t *testing.T) { + conf.LoadYamlConfig("../conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + test.ResetApp() + }() + + message := "{\"default\": \"message\"}" + subject := "I'm the subject" + attrs := map[string]app.MessageAttributeValue{ + "test": app.MessageAttributeValue{ + Name: "MyAttr", + DataType: "string", + Value: "value", + }, + } + + sub := app.SyncTopics.Topics["unit-topic1"].Subscriptions[0] + + result, err := createMessageBody(sub, message, subject, "not-json", attrs) + + assert.Nil(t, err) + + msg := &app.SNSMessage{} + json.Unmarshal(result, msg) + + assert.Equal(t, "Notification", msg.Type) + assert.Equal(t, "", msg.Token) + assert.Equal(t, fmt.Sprintf("%s:unit-topic1", fixtures.BASE_SNS_ARN), msg.TopicArn) + assert.Equal(t, "I'm the subject", msg.Subject) + assert.Equal(t, message, msg.Message) + assert.Equal(t, "1", msg.SignatureVersion) + assert.Contains(t, msg.SigningCertURL, "http://host:port/SimpleNotificationService/") + assert.Contains(t, msg.UnsubscribeURL, "http://host:port/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:region:accountID:unit-topic1:") + assert.Equal(t, msg.MessageAttributes, map[string]app.MsgAttr{"test": app.MsgAttr{Type: "string", Value: "value"}}) +} + +func Test_formatAttributes_success(t *testing.T) { + attrs := map[string]app.MessageAttributeValue{ + "test1": app.MessageAttributeValue{ + Name: "MyAttr", + DataType: "string", + Value: "value1", + }, + "test2": app.MessageAttributeValue{ + Name: "MyAttr", + DataType: "string", + Value: "value2", + }, + } + expected := map[string]app.MsgAttr{ + "test1": app.MsgAttr{Type: "string", Value: "value1"}, + "test2": app.MsgAttr{Type: "string", Value: "value2"}, + } + + result := formatAttributes(attrs) + + assert.Equal(t, expected, result) +} diff --git a/app/gosns/subscribe_test.go b/app/gosns/subscribe_test.go index dc670cf2..2ae46e79 100644 --- a/app/gosns/subscribe_test.go +++ b/app/gosns/subscribe_test.go @@ -108,7 +108,7 @@ func TestSubscribeV1_success_duplicate_subscription(t *testing.T) { v := resultingStruct.(*models.SubscribeRequest) *v = models.SubscribeRequest{ TopicArn: fmt.Sprintf("%s:%s", fixtures.BASE_SNS_ARN, "unit-topic1"), - Endpoint: fmt.Sprintf("%s:%s", fixtures.BASE_SQS_ARN, "subscribed-queue2"), + Endpoint: fmt.Sprintf("%s:%s", fixtures.BASE_SQS_ARN, "subscribed-queue1"), Protocol: "sqs", } return true @@ -122,7 +122,7 @@ func TestSubscribeV1_success_duplicate_subscription(t *testing.T) { subscriptions := app.SyncTopics.Topics["unit-topic1"].Subscriptions assert.Len(t, subscriptions, 1) - assert.Equal(t, fmt.Sprintf("%s:%s", fixtures.BASE_SQS_ARN, "subscribed-queue2"), subscriptions[0].EndPoint) + assert.Equal(t, fmt.Sprintf("%s:%s", fixtures.BASE_SQS_ARN, "subscribed-queue1"), subscriptions[0].EndPoint) assert.Equal(t, "sqs", subscriptions[0].Protocol) assert.True(t, subscriptions[0].Raw) assert.Contains(t, subscriptions[0].SubscriptionArn, fmt.Sprintf("%s:%s", fixtures.BASE_SNS_ARN, "unit-topic1")) @@ -157,7 +157,7 @@ func TestSubscribeV1_error_missing_topic(t *testing.T) { v := resultingStruct.(*models.SubscribeRequest) *v = models.SubscribeRequest{ TopicArn: fmt.Sprintf("%s:%s", fixtures.BASE_SNS_ARN, "garbage"), - Endpoint: fmt.Sprintf("%s:%s", fixtures.BASE_SQS_ARN, "subscribed-queue2"), + Endpoint: fmt.Sprintf("%s:%s", fixtures.BASE_SQS_ARN, "subscribed-queue1"), Protocol: "sqs", } return true diff --git a/app/gosqs/send_message.go b/app/gosqs/send_message.go index ceb237a8..cc180f1a 100644 --- a/app/gosqs/send_message.go +++ b/app/gosqs/send_message.go @@ -60,7 +60,7 @@ func SendMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) { log.Debugf("Putting Message in Queue: [%s]", queueName) msg := app.Message{MessageBody: []byte(messageBody)} if len(messageAttributes) > 0 { - oldStyleMessageAttributes := convertToOldMessageAttributeValueStructure(messageAttributes) + oldStyleMessageAttributes := utils.ConvertToOldMessageAttributeValueStructure(messageAttributes) msg.MessageAttributes = oldStyleMessageAttributes msg.MD5OfMessageAttributes = common.HashAttributes(oldStyleMessageAttributes) } @@ -100,32 +100,3 @@ func SendMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) { return http.StatusOK, respStruct } - -// TODO: -// Refactor internal model for MessageAttribute between SendMessage and ReceiveMessage -// from app.MessageAttributeValue(old) to models.MessageAttributeValue(new) and remove this temporary function. -func convertToOldMessageAttributeValueStructure(newValues map[string]models.MessageAttributeValue) map[string]app.MessageAttributeValue { - attributes := make(map[string]app.MessageAttributeValue) - - for name, entry := range newValues { - // StringListValue and BinaryListValue is currently not implemented - // Please refer app/gosqs/message_attributes.go - value := "" - valueKey := "" - if entry.StringValue != "" { - value = entry.StringValue - valueKey = "StringValue" - } else if entry.BinaryValue != "" { - value = entry.BinaryValue - valueKey = "BinaryValue" - } - attributes[name] = app.MessageAttributeValue{ - Name: name, - DataType: entry.DataType, - Value: value, - ValueKey: valueKey, - } - } - - return attributes -} diff --git a/app/models/models.go b/app/models/models.go index 5a42bb4f..75d8ccbc 100644 --- a/app/models/models.go +++ b/app/models/models.go @@ -23,3 +23,12 @@ var AVAILABLE_QUEUE_ATTRIBUTES = map[string]bool{ "LastModifiedTimestamp": true, "QueueArn": true, } + +// TODO - reconcile this with app.MessageAttributeValue - deal with convertToOldMessageAttributeValueStructure +type MessageAttributeValue struct { + BinaryListValues []string `json:"BinaryListValues"` // currently unsupported by AWS + BinaryValue string `json:"BinaryValue"` + DataType string `json:"DataType"` + StringListValues []string `json:"StringListValues"` // currently unsupported by AWS + StringValue string `json:"StringValue"` +} diff --git a/app/models/responses.go b/app/models/responses.go index 07fa1e07..1ffb99ed 100644 --- a/app/models/responses.go +++ b/app/models/responses.go @@ -342,3 +342,22 @@ func (r UnsubscribeResponse) GetResult() interface{} { func (r UnsubscribeResponse) GetRequestId() string { return r.Metadata.RequestId } + +/*** Publish ***/ +type PublishResult struct { + MessageId string `xml:"MessageId"` +} + +type PublishResponse struct { + Xmlns string `xml:"xmlns,attr"` + Result PublishResult `xml:"PublishResult"` + Metadata app.ResponseMetadata `xml:"ResponseMetadata"` +} + +func (r PublishResponse) GetResult() interface{} { + return r.Result +} + +func (r PublishResponse) GetRequestId() string { + return r.Metadata.RequestId +} diff --git a/app/models/sns.go b/app/models/sns.go index eab5352b..422e8cd2 100644 --- a/app/models/sns.go +++ b/app/models/sns.go @@ -46,7 +46,6 @@ type TopicAttributes struct { } func (r *CreateTopicRequest) SetAttributesFromForm(values url.Values) { - for i := 1; true; i++ { nameKey := fmt.Sprintf("Attribute.%d.Name", i) attrName := values.Get(nameKey) @@ -183,3 +182,45 @@ type UnsubscribeRequest struct { } func (r *UnsubscribeRequest) SetAttributesFromForm(values url.Values) {} + +func NewPublishRequest() *PublishRequest { + return &PublishRequest{} +} + +type PublishRequest struct { + Message string `json:"Message" schema:"Message"` + MessageAttributes map[string]MessageAttributeValue `json:"MessageAttributes" schema:"MessageAttributes"` + MessageDeduplicationId string `json:"MessageDeduplicationId" schema:"MessageDeduplicationId"` // Not implemented + MessageGroupId string `json:"MessageGroupId" schema:"MessageGroupId"` // Not implemented + MessageStructure string `json:"MessageStructure" schema:"MessageStructure"` + PhoneNumber string `json:"PhoneNumber" schema:"PhoneNumber"` // Not implemented + Subject string `json:"Subject" schema:"Subject"` + TargetArn string `json:"TargetArn" schema:"TargetArn"` // Not implemented + TopicArn string `json:"TopicArn" schema:"TopicArn"` +} + +func (r *PublishRequest) SetAttributesFromForm(values url.Values) { + for i := 1; true; i++ { + nameKey := fmt.Sprintf("MessageAttributes.entry.%d.Name", i) + name := values.Get(nameKey) + if name == "" { + break + } + + dataTypeKey := fmt.Sprintf("MessageAttributes.entry.%d.Value.DataType", i) + dataType := values.Get(dataTypeKey) + if dataType == "" { + log.Warnf("DataType of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name) + continue + } + + stringValue := values.Get(fmt.Sprintf("MessageAttributes.entry.%d.Value.StringValue", i)) + binaryValue := values.Get(fmt.Sprintf("MessageAttributes.entry.%d.Value.BinaryValue", i)) + + r.MessageAttributes[name] = MessageAttributeValue{ + DataType: dataType, + StringValue: stringValue, + BinaryValue: binaryValue, + } + } +} diff --git a/app/models/sqs.go b/app/models/sqs.go index 9e5fbc79..9b38967d 100644 --- a/app/models/sqs.go +++ b/app/models/sqs.go @@ -184,13 +184,6 @@ type SendMessageRequest struct { MessageSystemAttributes map[string]MessageAttributeValue `json:"MessageSystemAttributes" schema:"MessageSystemAttributes"` QueueUrl string `json:"QueueUrl" schema:"QueueUrl"` } -type MessageAttributeValue struct { - BinaryListValues []string `json:"BinaryListValues"` // currently unsupported by AWS - BinaryValue string `json:"BinaryValue"` - DataType string `json:"DataType"` - StringListValues []string `json:"StringListValues"` // currently unsupported by AWS - StringValue string `json:"StringValue"` -} func (r *SendMessageRequest) SetAttributesFromForm(values url.Values) { for i := 1; true; i++ { @@ -215,10 +208,6 @@ func (r *SendMessageRequest) SetAttributesFromForm(values url.Values) { StringValue: stringValue, BinaryValue: binaryValue, } - - if _, ok := r.MessageAttributes[name]; !ok { - log.Warnf("StringValue or BinaryValue of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name) - } } } diff --git a/app/router/router.go b/app/router/router.go index 6b7293fb..6f979443 100644 --- a/app/router/router.go +++ b/app/router/router.go @@ -80,6 +80,7 @@ var routingTableV1 = map[string]func(r *http.Request) (int, interfaces.AbstractR "CreateTopic": sns.CreateTopicV1, "Subscribe": sns.SubscribeV1, "Unsubscribe": sns.UnsubscribeV1, + "Publish": sns.PublishV1, } var routingTable = map[string]http.HandlerFunc{ @@ -94,7 +95,6 @@ var routingTable = map[string]http.HandlerFunc{ "GetSubscriptionAttributes": sns.GetSubscriptionAttributes, "ListSubscriptionsByTopic": sns.ListSubscriptionsByTopic, "ListSubscriptions": sns.ListSubscriptions, - "Publish": sns.Publish, // SNS Internal "ConfirmSubscription": sns.ConfirmSubscription, diff --git a/app/router/router_test.go b/app/router/router_test.go index 529ff9b5..dac026f8 100644 --- a/app/router/router_test.go +++ b/app/router/router_test.go @@ -272,6 +272,7 @@ func TestActionHandler_v0_xml(t *testing.T) { "CreateTopic": sns.CreateTopicV1, "Subscribe": sns.SubscribeV1, "Unsubscribe": sns.UnsubscribeV1, + "Publish": sns.PublishV1, } routingTable = map[string]http.HandlerFunc{ // SQS @@ -285,7 +286,6 @@ func TestActionHandler_v0_xml(t *testing.T) { "GetSubscriptionAttributes": sns.GetSubscriptionAttributes, "ListSubscriptionsByTopic": sns.ListSubscriptionsByTopic, "ListSubscriptions": sns.ListSubscriptions, - "Publish": sns.Publish, // SNS Internal "ConfirmSubscription": sns.ConfirmSubscription, diff --git a/app/servertest/server_test.go b/app/servertest/server_test.go index 48be85a0..271047c0 100644 --- a/app/servertest/server_test.go +++ b/app/servertest/server_test.go @@ -4,24 +4,11 @@ import ( "errors" "testing" - "encoding/json" - "fmt" - "io/ioutil" - "net" - "net/http" - "strings" - "time" - - "github.com/Admiral-Piett/goaws/app" - "github.com/Admiral-Piett/goaws/app/router" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs/sqsiface" - "github.com/gorilla/mux" - log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -115,46 +102,6 @@ func TestNewIntegration(t *testing.T) { } } -func TestSNSRoutes(t *testing.T) { - // Consume address - srv, err := NewSNSTest("localhost:4100", &snsTest{t: t}) - - noSetupError(t, err) - defer srv.Quit() - - creds := credentials.NewStaticCredentials("id", "secret", "token") - - awsConfig := aws.NewConfig(). - WithRegion("us-east-1"). - WithEndpoint(srv.URL()). - WithCredentials(creds) - - session1 := session.New(awsConfig) - client := sns.New(session1) - - response, err := client.CreateTopic(&sns.CreateTopicInput{ - Name: aws.String("testing"), - }) - require.NoError(t, err, "SNS Create Topic Failed") - - params := &sns.SubscribeInput{ - Protocol: aws.String("sqs"), // Required - TopicArn: response.TopicArn, // Required - Endpoint: aws.String(srv.URL() + "/local-sns"), - } - subscribeResponse, err := client.Subscribe(params) - require.NoError(t, err, "SNS Subscribe Failed") - t.Logf("Succesfully subscribed: %s\n", *subscribeResponse.SubscriptionArn) - - publishParams := &sns.PublishInput{ - Message: aws.String("Cool"), - TopicArn: response.TopicArn, - } - publishResponse, err := client.Publish(publishParams) - require.NoError(t, err, "SNS Publish Failed") - t.Logf("Succesfully published: %s\n", *publishResponse.MessageId) -} - func newSQS(t *testing.T, region string, endpoint string) *sqs.SQS { creds := credentials.NewStaticCredentials("id", "secret", "token") @@ -176,123 +123,3 @@ func noOp(sqsiface.SQSAPI, *string) error { func noSetupError(t *testing.T, err error) { require.NoError(t, err, "Failed to setup for test") } - -type snsTest struct { - t *testing.T -} - -func NewSNSTest(addr string, snsTest *snsTest) (*Server, error) { - if addr == "" { - addr = "localhost:0" - } - localURL := strings.Split(addr, ":") - app.CurrentEnvironment.Host = localURL[0] - app.CurrentEnvironment.Port = localURL[1] - log.WithFields(log.Fields{ - "host": app.CurrentEnvironment.Host, - "port": app.CurrentEnvironment.Port, - }).Info("URL Starting to listen") - - l, err := net.Listen("tcp", addr) - if err != nil { - return nil, fmt.Errorf("cannot listen on localhost: %v", err) - } - if err != nil { - return nil, fmt.Errorf("cannot listen on localhost: %v", err) - } - - r := mux.NewRouter() - r.Handle("/", router.New()) - snsTest.SetSNSRoutes("/local-sns", r, nil) - - srv := Server{listener: l, handler: r} - - go http.Serve(l, &srv) - - return &srv, nil -} - -// Define handlers for various AWS SNS POST calls -func (s *snsTest) SetSNSRoutes(urlPath string, r *mux.Router, handler http.Handler) { - - r.HandleFunc(urlPath, s.SubscribeConfirmHandle).Methods("POST").Headers("x-amz-sns-message-type", "SubscriptionConfirmation") - if handler != nil { - log.WithFields(log.Fields{ - "urlPath": urlPath, - }).Debug("handler not nil") - // handler is supposed to be wrapper that inturn calls NotificationHandle - r.Handle(urlPath, handler).Methods("POST").Headers("x-amz-sns-message-type", "Notification") - } else { - log.WithFields(log.Fields{ - "urlPath": urlPath, - }).Debug("handler nil") - // if no wrapper handler available then define anonymous handler and directly call NotificationHandle - r.HandleFunc(urlPath, func(rw http.ResponseWriter, req *http.Request) { - s.NotificationHandle(rw, req) - }).Methods("POST").Headers("x-amz-sns-message-type", "Notification") - } -} - -func (s *snsTest) SubscribeConfirmHandle(rw http.ResponseWriter, req *http.Request) { - //params := &sns.ConfirmSubscriptionInput{ - // Token: aws.String(msg.Token), // Required - // TopicArn: aws.String(msg.TopicArn), // Required - //} - var f interface{} - body, err := ioutil.ReadAll(req.Body) - if err != nil { - s.t.Log("Unable to Parse Body") - } - s.t.Log(string(body)) - err = json.Unmarshal(body, &f) - if err != nil { - s.t.Log("Unable to Unmarshal request") - } - - data := f.(map[string]interface{}) - s.t.Log(data["Type"].(string)) - - if data["Type"].(string) == "SubscriptionConfirmation" { - subscribeURL := data["SubscribeURL"].(string) - time.Sleep(time.Second) - response, err := http.Get(subscribeURL) - if err != nil { - s.t.Logf("Unable to confirm subscriptions. %s\n", err) - s.t.Fail() - } else { - s.t.Logf("Subscription Confirmed successfully. %d\n", response.StatusCode) - } - } else if data["Type"].(string) == "Notification" { - s.t.Log("Received this message : ", data["Message"].(string)) - } -} - -func (s *snsTest) NotificationHandle(rw http.ResponseWriter, req *http.Request) []byte { - subArn := req.Header.Get("X-Amz-Sns-Subscription-Arn") - - msg := app.SNSMessage{} - _, err := DecodeJSONMessage(req, &msg) - if err != nil { - log.Error(err) - return []byte{} - } - - s.t.Logf("NotificationHandle %s MSG(%s)", subArn, msg.Message) - return []byte(msg.Message) -} - -func DecodeJSONMessage(req *http.Request, v interface{}) ([]byte, error) { - - payload, err := ioutil.ReadAll(req.Body) - if err != nil { - return nil, err - } - if len(payload) == 0 { - return nil, errors.New("empty payload") - } - err = json.Unmarshal([]byte(payload), v) - if err != nil { - return nil, err - } - return payload, nil -} diff --git a/app/sns_messages.go b/app/sns_messages.go index 3e4fff4a..5917e54c 100644 --- a/app/sns_messages.go +++ b/app/sns_messages.go @@ -81,18 +81,6 @@ type ListSubscriptionsByTopicResponse struct { Metadata ResponseMetadata `xml:"ResponseMetadata"` } -/*** Publish ***/ - -type PublishResult struct { - MessageId string `xml:"MessageId"` -} - -type PublishResponse struct { - Xmlns string `xml:"xmlns,attr"` - Result PublishResult `xml:"PublishResult"` - Metadata ResponseMetadata `xml:"ResponseMetadata"` -} - /*** Delete Topic ***/ type DeleteTopicResponse struct { Xmlns string `xml:"xmlns,attr"` diff --git a/app/utils/utils.go b/app/utils/utils.go index 0ed52dda..e5b7305d 100644 --- a/app/utils/utils.go +++ b/app/utils/utils.go @@ -7,6 +7,8 @@ import ( "net/http" "net/url" + "github.com/Admiral-Piett/goaws/app" + "github.com/Admiral-Piett/goaws/app/models" "github.com/Admiral-Piett/goaws/app/interfaces" @@ -89,3 +91,32 @@ func CreateErrorResponseV1(errKey string, isSqs bool) (int, interfaces.AbstractR } return err.StatusCode(), respStruct } + +// TODO: +// Refactor internal model for MessageAttribute between SendMessage and ReceiveMessage +// from app.MessageAttributeValue(old) to models.MessageAttributeValue(new) and remove this temporary function. +func ConvertToOldMessageAttributeValueStructure(newValues map[string]models.MessageAttributeValue) map[string]app.MessageAttributeValue { + attributes := make(map[string]app.MessageAttributeValue) + + for name, entry := range newValues { + // StringListValue and BinaryListValue is currently not implemented + // Please refer app/gosqs/message_attributes.go + value := "" + valueKey := "" + if entry.StringValue != "" { + value = entry.StringValue + valueKey = "StringValue" + } else if entry.BinaryValue != "" { + value = entry.BinaryValue + valueKey = "BinaryValue" + } + attributes[name] = app.MessageAttributeValue{ + Name: name, + DataType: entry.DataType, + Value: value, + ValueKey: valueKey, + } + } + + return attributes +} diff --git a/smoke_tests/sns_publish_test.go b/smoke_tests/sns_publish_test.go new file mode 100644 index 00000000..30082e57 --- /dev/null +++ b/smoke_tests/sns_publish_test.go @@ -0,0 +1,510 @@ +package smoke_tests + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/config" + + "github.com/Admiral-Piett/goaws/app/conf" + "github.com/Admiral-Piett/goaws/app/test" + + "github.com/gavv/httpexpect/v2" + + "github.com/Admiral-Piett/goaws/app" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/stretchr/testify/assert" +) + +func Test_Publish_sqs_json_raw(t *testing.T) { + server := generateServer() + defaultEnv := app.CurrentEnvironment + conf.LoadYamlConfig("../app/conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + server.Close() + test.ResetResources() + app.CurrentEnvironment = defaultEnv + }() + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + snsClient := sns.NewFromConfig(sdkConfig) + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + message := "{\"IAm\": \"aMessage\"}" + subject := "I am a subject" + response, err := snsClient.Publish(context.TODO(), &sns.PublishInput{ + TopicArn: &topicArn, + Message: &message, + Subject: &subject, + }) + + assert.Nil(t, err) + assert.NotNil(t, response) + + messages := app.SyncQueues.Queues["subscribed-queue1"].Messages + assert.Len(t, messages, 1) + assert.Equal(t, message, string(messages[0].MessageBody)) +} + +func Test_Publish_sqs_json_not_raw(t *testing.T) { + server := generateServer() + defaultEnv := app.CurrentEnvironment + conf.LoadYamlConfig("../app/conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + server.Close() + test.ResetResources() + app.CurrentEnvironment = defaultEnv + }() + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + snsClient := sns.NewFromConfig(sdkConfig) + + topicArn := app.SyncTopics.Topics["unit-topic3"].Arn + message := "{\"IAm\": \"aMessage\"}" + subject := "I am a subject" + response, err := snsClient.Publish(context.TODO(), &sns.PublishInput{ + TopicArn: &topicArn, + Message: &message, + Subject: &subject, + }) + + assert.Nil(t, err) + assert.NotNil(t, response) + + messages := app.SyncQueues.Queues["subscribed-queue3"].Messages + assert.Len(t, messages, 1) + + body := string(messages[0].MessageBody) + assert.Contains(t, body, "\"Message\":\"{\\\"IAm\\\": \\\"aMessage\\\"}\"") + assert.Contains(t, body, "Type") + assert.Contains(t, body, "MessageId") + assert.Contains(t, body, "TopicArn") + assert.Contains(t, body, subject) + assert.Contains(t, body, "Signature") + assert.Contains(t, body, "SigningCertURL") + assert.Contains(t, body, "UnsubscribeURL") + assert.Contains(t, body, "SubscribeURL") + assert.Contains(t, body, "MessageAttributes") +} + +func Test_Publish_http_json(t *testing.T) { + server := generateServer() + defaultEnv := app.CurrentEnvironment + conf.LoadYamlConfig("../app/conf/mock-data/mock-config.yaml", "BaseUnitTests") + + called := false + httpMessage := "" + subscribedServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + buf := new(strings.Builder) + io.Copy(buf, r.Body) + httpMessage = buf.String() + + called = true + w.WriteHeader(200) + })) + + defer func() { + server.Close() + subscribedServer.Close() + test.ResetResources() + app.CurrentEnvironment = defaultEnv + }() + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + snsClient := sns.NewFromConfig(sdkConfig) + + app.SyncTopics.Lock() + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].EndPoint = subscribedServer.URL + app.SyncTopics.Unlock() + + topicArn := app.SyncTopics.Topics["unit-topic-http"].Arn + message := "{\"IAm\": \"aMessage\"}" + response, err := snsClient.Publish(context.TODO(), &sns.PublishInput{ + TopicArn: &topicArn, + Message: &message, + }) + + assert.Nil(t, err) + assert.NotNil(t, response) + + assert.True(t, called) + assert.Equal(t, "\"{\\\"IAm\\\": \\\"aMessage\\\"}\"", httpMessage) +} + +func Test_Publish_https_json_raw(t *testing.T) { + server := generateServer() + defaultEnv := app.CurrentEnvironment + conf.LoadYamlConfig("../app/conf/mock-data/mock-config.yaml", "BaseUnitTests") + + called := false + httpMessage := "" + subscribedServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + buf := new(strings.Builder) + io.Copy(buf, r.Body) + httpMessage = buf.String() + + called = true + w.WriteHeader(200) + })) + + defer func() { + server.Close() + subscribedServer.Close() + test.ResetResources() + app.CurrentEnvironment = defaultEnv + }() + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + snsClient := sns.NewFromConfig(sdkConfig) + + app.SyncTopics.Lock() + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].Protocol = "https" + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].EndPoint = subscribedServer.URL + app.SyncTopics.Unlock() + + topicArn := app.SyncTopics.Topics["unit-topic-http"].Arn + message := "{\"IAm\": \"aMessage\"}" + response, err := snsClient.Publish(context.TODO(), &sns.PublishInput{ + TopicArn: &topicArn, + Message: &message, + }) + + assert.Nil(t, err) + assert.NotNil(t, response) + + assert.True(t, called) + assert.Equal(t, "\"{\\\"IAm\\\": \\\"aMessage\\\"}\"", httpMessage) +} + +func Test_Publish_https_json_not_raw(t *testing.T) { + server := generateServer() + defaultEnv := app.CurrentEnvironment + conf.LoadYamlConfig("../app/conf/mock-data/mock-config.yaml", "BaseUnitTests") + + called := false + httpMessage := "" + subscribedServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + buf := new(strings.Builder) + io.Copy(buf, r.Body) + httpMessage = buf.String() + + called = true + w.WriteHeader(200) + })) + + defer func() { + server.Close() + subscribedServer.Close() + test.ResetResources() + app.CurrentEnvironment = defaultEnv + }() + + sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) + sdkConfig.BaseEndpoint = aws.String(server.URL) + snsClient := sns.NewFromConfig(sdkConfig) + + app.SyncTopics.Lock() + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].Protocol = "https" + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].Raw = false + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].EndPoint = subscribedServer.URL + app.SyncTopics.Unlock() + + topicArn := app.SyncTopics.Topics["unit-topic-http"].Arn + message := "{\"IAm\": \"aMessage\"}" + subject := "I am a subject" + response, err := snsClient.Publish(context.TODO(), &sns.PublishInput{ + TopicArn: &topicArn, + Message: &message, + Subject: &subject, + }) + + assert.Nil(t, err) + assert.NotNil(t, response) + + assert.True(t, called) + assert.Contains(t, httpMessage, "\"Message\":\"{\\\"IAm\\\": \\\"aMessage\\\"}\"") + assert.Contains(t, httpMessage, "Type") + assert.Contains(t, httpMessage, "MessageId") + assert.Contains(t, httpMessage, "TopicArn") + assert.Contains(t, httpMessage, subject) + assert.Contains(t, httpMessage, "Signature") + assert.Contains(t, httpMessage, "SigningCertURL") + assert.Contains(t, httpMessage, "UnsubscribeURL") + assert.Contains(t, httpMessage, "SubscribeURL") + assert.Contains(t, httpMessage, "MessageAttributes") +} + +func Test_Publish_sqs_xml_raw(t *testing.T) { + server := generateServer() + defaultEnv := app.CurrentEnvironment + conf.LoadYamlConfig("../app/conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + server.Close() + test.ResetResources() + app.CurrentEnvironment = defaultEnv + }() + + e := httpexpect.Default(t, server.URL) + + topicArn := app.SyncTopics.Topics["unit-topic1"].Arn + message := "{\"IAm\": \"aMessage\"}" + subject := "I am a subject" + + requestBody := struct { + Action string `schema:"Action"` + TopicArn string `schema:"TopicArn"` + Message string `schema:"Message"` + Subject string `schema:"Subject"` + }{ + Action: "Publish", + TopicArn: topicArn, + Message: message, + Subject: subject, + } + + e.POST("/"). + WithForm(requestBody). + Expect(). + Status(http.StatusOK). + Body().Raw() + + messages := app.SyncQueues.Queues["subscribed-queue1"].Messages + assert.Len(t, messages, 1) + assert.Equal(t, message, string(messages[0].MessageBody)) +} + +func Test_Publish_sqs_xml_not_raw(t *testing.T) { + server := generateServer() + defaultEnv := app.CurrentEnvironment + conf.LoadYamlConfig("../app/conf/mock-data/mock-config.yaml", "BaseUnitTests") + defer func() { + server.Close() + test.ResetResources() + app.CurrentEnvironment = defaultEnv + }() + + e := httpexpect.Default(t, server.URL) + + topicArn := app.SyncTopics.Topics["unit-topic3"].Arn + message := "{\"IAm\": \"aMessage\"}" + subject := "I am a subject" + + requestBody := struct { + Action string `schema:"Action"` + TopicArn string `schema:"TopicArn"` + Message string `schema:"Message"` + Subject string `schema:"Subject"` + }{ + Action: "Publish", + TopicArn: topicArn, + Message: message, + Subject: subject, + } + + e.POST("/"). + WithForm(requestBody). + Expect(). + Status(http.StatusOK). + Body().Raw() + + messages := app.SyncQueues.Queues["subscribed-queue3"].Messages + assert.Len(t, messages, 1) + + body := string(messages[0].MessageBody) + assert.Contains(t, body, "\"Message\":\"{\\\"IAm\\\": \\\"aMessage\\\"}\"") + assert.Contains(t, body, "Type") + assert.Contains(t, body, "MessageId") + assert.Contains(t, body, "TopicArn") + assert.Contains(t, body, subject) + assert.Contains(t, body, "Signature") + assert.Contains(t, body, "SigningCertURL") + assert.Contains(t, body, "UnsubscribeURL") + assert.Contains(t, body, "SubscribeURL") + assert.Contains(t, body, "MessageAttributes") +} + +func Test_Publish_http_xml(t *testing.T) { + server := generateServer() + defaultEnv := app.CurrentEnvironment + conf.LoadYamlConfig("../app/conf/mock-data/mock-config.yaml", "BaseUnitTests") + + called := false + httpMessage := "" + subscribedServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + buf := new(strings.Builder) + io.Copy(buf, r.Body) + httpMessage = buf.String() + + called = true + w.WriteHeader(200) + })) + + defer func() { + server.Close() + subscribedServer.Close() + test.ResetResources() + app.CurrentEnvironment = defaultEnv + }() + + e := httpexpect.Default(t, server.URL) + + app.SyncTopics.Lock() + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].EndPoint = subscribedServer.URL + app.SyncTopics.Unlock() + + topicArn := app.SyncTopics.Topics["unit-topic-http"].Arn + message := "{\"IAm\": \"aMessage\"}" + subject := "I am a subject" + + requestBody := struct { + Action string `schema:"Action"` + TopicArn string `schema:"TopicArn"` + Message string `schema:"Message"` + Subject string `schema:"Subject"` + }{ + Action: "Publish", + TopicArn: topicArn, + Message: message, + Subject: subject, + } + + e.POST("/"). + WithForm(requestBody). + Expect(). + Status(http.StatusOK). + Body().Raw() + + assert.True(t, called) + assert.Equal(t, "\"{\\\"IAm\\\": \\\"aMessage\\\"}\"", httpMessage) +} + +func Test_Publish_https_xml_raw(t *testing.T) { + server := generateServer() + defaultEnv := app.CurrentEnvironment + conf.LoadYamlConfig("../app/conf/mock-data/mock-config.yaml", "BaseUnitTests") + + called := false + httpMessage := "" + subscribedServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + buf := new(strings.Builder) + io.Copy(buf, r.Body) + httpMessage = buf.String() + + called = true + w.WriteHeader(200) + })) + + defer func() { + server.Close() + subscribedServer.Close() + test.ResetResources() + app.CurrentEnvironment = defaultEnv + }() + + e := httpexpect.Default(t, server.URL) + + app.SyncTopics.Lock() + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].Protocol = "https" + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].EndPoint = subscribedServer.URL + app.SyncTopics.Unlock() + + topicArn := app.SyncTopics.Topics["unit-topic-http"].Arn + message := "{\"IAm\": \"aMessage\"}" + subject := "I am a subject" + + requestBody := struct { + Action string `schema:"Action"` + TopicArn string `schema:"TopicArn"` + Message string `schema:"Message"` + Subject string `schema:"Subject"` + }{ + Action: "Publish", + TopicArn: topicArn, + Message: message, + Subject: subject, + } + + e.POST("/"). + WithForm(requestBody). + Expect(). + Status(http.StatusOK). + Body().Raw() + + assert.True(t, called) + assert.Equal(t, "\"{\\\"IAm\\\": \\\"aMessage\\\"}\"", httpMessage) +} + +func Test_Publish_https_xml_not_raw(t *testing.T) { + server := generateServer() + defaultEnv := app.CurrentEnvironment + conf.LoadYamlConfig("../app/conf/mock-data/mock-config.yaml", "BaseUnitTests") + + called := false + httpMessage := "" + subscribedServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + buf := new(strings.Builder) + io.Copy(buf, r.Body) + httpMessage = buf.String() + + called = true + w.WriteHeader(200) + })) + + defer func() { + server.Close() + subscribedServer.Close() + test.ResetResources() + app.CurrentEnvironment = defaultEnv + }() + + e := httpexpect.Default(t, server.URL) + + app.SyncTopics.Lock() + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].Protocol = "https" + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].Raw = false + app.SyncTopics.Topics["unit-topic-http"].Subscriptions[0].EndPoint = subscribedServer.URL + app.SyncTopics.Unlock() + + topicArn := app.SyncTopics.Topics["unit-topic-http"].Arn + message := "{\"IAm\": \"aMessage\"}" + subject := "I am a subject" + + requestBody := struct { + Action string `schema:"Action"` + TopicArn string `schema:"TopicArn"` + Message string `schema:"Message"` + Subject string `schema:"Subject"` + }{ + Action: "Publish", + TopicArn: topicArn, + Message: message, + Subject: subject, + } + + e.POST("/"). + WithForm(requestBody). + Expect(). + Status(http.StatusOK). + Body().Raw() + + assert.True(t, called) + assert.Contains(t, httpMessage, "\"Message\":\"{\\\"IAm\\\": \\\"aMessage\\\"}\"") + assert.Contains(t, httpMessage, "Type") + assert.Contains(t, httpMessage, "MessageId") + assert.Contains(t, httpMessage, "TopicArn") + assert.Contains(t, httpMessage, subject) + assert.Contains(t, httpMessage, "Signature") + assert.Contains(t, httpMessage, "SigningCertURL") + assert.Contains(t, httpMessage, "UnsubscribeURL") + assert.Contains(t, httpMessage, "SubscribeURL") + assert.Contains(t, httpMessage, "MessageAttributes") +}