Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PublishV1 for JSON support #315

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# GoAws
[![Build Status](https://travis-ci.org/p4tin/goaws.svg?branch=master)](https://travis-ci.org/p4tin/goaws)

You are always welcome to [tweet the creator in chief](https://twitter.com/gocodecloud) or [buy him a coffee](https://www.paypal.me/p4tin)

Written in Go this is a clone of the AWS SQS/SNS systems. This system is designed to emulate SQS and SNS in a local environment so developers can test their interfaces without having to connect to the AWS Cloud and possibly incurring the expense, or even worse actually write to production topics/queues by mistake. If you see any problems or would like to see a new feature, please open an issue here in github. As well, I will logon to Gitter so we can discuss your deployment issues or the weather.


Expand Down
15 changes: 13 additions & 2 deletions app/conf/mock-data/mock-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,21 @@ BaseUnitTests:
- Name: unit-queue2
RedrivePolicy: '{"maxReceiveCount": 100, "deadLetterTargetArn":"arn:aws:sqs:us-east-1:100010001000:other-queue1"}'
- Name: other-queue1
- Name: subscribed-queue2
- Name: subscribed-queue1
- Name: subscribed-queue3
Topics:
- Name: unit-topic1
Subscriptions:
- QueueName: subscribed-queue2
- QueueName: subscribed-queue1
Raw: true
- Name: unit-topic2
- Name: unit-topic3
Subscriptions:
- QueueName: subscribed-queue3
Raw: false
- Name: unit-topic-http
Subscriptions:
- Protocol: http
EndPoint: http://over.ride.me/for/tests
TopicArn: arn:aws:sqs:region:accountID:unit-topic-http
Raw: true
202 changes: 2 additions & 200 deletions app/gosns/gosns.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,137 +329,8 @@ func DeleteTopic(w http.ResponseWriter, req *http.Request) {

}

// aws --endpoint-url http://localhost:47194 sns publish --topic-arn arn:aws:sns:yopa-local:000000000000:test1 --message "This is a test"
func Publish(w http.ResponseWriter, req *http.Request) {
content := req.FormValue("ContentType")
topicArn := req.FormValue("TopicArn")
subject := req.FormValue("Subject")
messageBody := req.FormValue("Message")
messageStructure := req.FormValue("MessageStructure")
messageAttributes := getMessageAttributesFromRequest(req)

arnSegments := strings.Split(topicArn, ":")
topicName := arnSegments[len(arnSegments)-1]

_, ok := app.SyncTopics.Topics[topicName]
if ok {
log.WithFields(log.Fields{
"topic": topicName,
"topicArn": topicArn,
"subject": subject,
}).Debug("Publish to Topic")
for _, subs := range app.SyncTopics.Topics[topicName].Subscriptions {
switch app.Protocol(subs.Protocol) {
case app.ProtocolSQS:
publishSQS(w, req, subs, messageBody, messageAttributes, subject, topicArn, topicName, messageStructure)
case app.ProtocolHTTP:
fallthrough
case app.ProtocolHTTPS:
publishHTTP(subs, messageBody, messageAttributes, subject, topicArn)
}
}
} else {
createErrorResponse(w, req, "TopicNotFound")
return
}

//Create the response
msgId, _ := common.NewUUID()
uuid, _ := common.NewUUID()
respStruct := app.PublishResponse{Xmlns: "http://queue.amazonaws.com/doc/2012-11-05/", Result: app.PublishResult{MessageId: msgId}, Metadata: app.ResponseMetadata{RequestId: uuid}}
SendResponseBack(w, req, respStruct, content)
}

func publishSQS(w http.ResponseWriter, req *http.Request,
subs *app.Subscription, messageBody string, messageAttributes map[string]app.MessageAttributeValue,
subject string, topicArn string, topicName string, messageStructure string) {
if subs.FilterPolicy != nil && !subs.FilterPolicy.IsSatisfiedBy(messageAttributes) {
return
}

endPoint := subs.EndPoint
uriSegments := strings.Split(endPoint, "/")
queueName := uriSegments[len(uriSegments)-1]
arnSegments := strings.Split(queueName, ":")
queueName = arnSegments[len(arnSegments)-1]

if _, ok := app.SyncQueues.Queues[queueName]; ok {
msg := app.Message{}

if subs.Raw == false {
m, err := CreateMessageBody(subs, messageBody, subject, messageStructure, messageAttributes)
if err != nil {
createErrorResponse(w, req, err.Error())
return
}

msg.MessageBody = m
} else {
msg.MessageAttributes = messageAttributes
msg.MD5OfMessageAttributes = common.HashAttributes(messageAttributes)
m, err := extractMessageFromJSON(messageBody, subs.Protocol)
if err == nil {
msg.MessageBody = []byte(m)
} else {
msg.MessageBody = []byte(messageBody)
}
}

msg.MD5OfMessageBody = common.GetMD5Hash(messageBody)
msg.Uuid, _ = common.NewUUID()
app.SyncQueues.Lock()
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
app.SyncQueues.Unlock()

log.Infof("%s: Topic: %s(%s), Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), topicName, queueName, msg.MessageBody)
} else {
log.Infof("%s: Queue %s does not exist, message discarded\n", time.Now().Format("2006-01-02 15:04:05"), queueName)
}
}

func publishHTTP(subs *app.Subscription, messageBody string, messageAttributes map[string]app.MessageAttributeValue,
subject string, topicArn string) {
id, _ := common.NewUUID()
msg := app.SNSMessage{
Type: "Notification",
MessageId: id,
TopicArn: topicArn,
Subject: subject,
Message: messageBody,
Timestamp: time.Now().UTC().Format(time.RFC3339),
SignatureVersion: "1",
SigningCertURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/SimpleNotificationService/" + id + ".pem",
UnsubscribeURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/?Action=Unsubscribe&SubscriptionArn=" + subs.SubscriptionArn,
MessageAttributes: formatAttributes(messageAttributes),
}

signature, err := signMessage(PrivateKEY, &msg)
if err != nil {
log.Error(err)
} else {
msg.Signature = signature
}
err = callEndpoint(subs.EndPoint, subs.SubscriptionArn, msg, subs.Raw)
if err != nil {
log.WithFields(log.Fields{
"EndPoint": subs.EndPoint,
"ARN": subs.SubscriptionArn,
"error": err.Error(),
}).Error("Error calling endpoint")
}
}

func formatAttributes(values map[string]app.MessageAttributeValue) map[string]app.MsgAttr {
attr := make(map[string]app.MsgAttr)
for k, v := range values {
attr[k] = app.MsgAttr{
Type: v.DataType,
Value: v.Value,
}
}
return attr
}

// NOTE: The use case for this is to use GoAWS to call some external system with the message payload. Essentially
// it is a localized subscription to some non-AWS endpoint.
func callEndpoint(endpoint string, subArn string, msg app.SNSMessage, raw bool) error {
log.WithFields(log.Fields{
"sns": msg,
Expand Down Expand Up @@ -524,75 +395,6 @@ func callEndpoint(endpoint string, subArn string, msg app.SNSMessage, raw bool)
return nil
}

func getMessageAttributesFromRequest(req *http.Request) map[string]app.MessageAttributeValue {
attributes := make(map[string]app.MessageAttributeValue)

for i := 1; true; i++ {
name := req.FormValue(fmt.Sprintf("MessageAttributes.entry.%d.Name", i))
if name == "" {
break
}

dataType := req.FormValue(fmt.Sprintf("MessageAttributes.entry.%d.Value.DataType", i))
if dataType == "" {
log.Warnf("DataType of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
continue
}

// StringListValue and BinaryListValue is currently not implemented
for _, valueKey := range [...]string{"StringValue", "BinaryValue"} {
value := req.FormValue(fmt.Sprintf("MessageAttributes.entry.%d.Value.%s", i, valueKey))
if value != "" {
attributes[name] = app.MessageAttributeValue{Name: name, DataType: dataType, Value: value, ValueKey: valueKey}
}
}

if _, ok := attributes[name]; !ok {
log.Warnf("StringValue or BinaryValue of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
}
}

return attributes
}

func CreateMessageBody(subs *app.Subscription, msg string, subject string, messageStructure string,
messageAttributes map[string]app.MessageAttributeValue) ([]byte, error) {

msgId, _ := common.NewUUID()

message := app.SNSMessage{
Type: "Notification",
MessageId: msgId,
TopicArn: subs.TopicArn,
Subject: subject,
Timestamp: time.Now().UTC().Format(time.RFC3339),
SignatureVersion: "1",
SigningCertURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/SimpleNotificationService/" + msgId + ".pem",
UnsubscribeURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/?Action=Unsubscribe&SubscriptionArn=" + subs.SubscriptionArn,
MessageAttributes: formatAttributes(messageAttributes),
}

if app.MessageStructure(messageStructure) == app.MessageStructureJSON {
m, err := extractMessageFromJSON(msg, subs.Protocol)
if err != nil {
return nil, err
}
message.Message = m
} else {
message.Message = msg
}

signature, err := signMessage(PrivateKEY, &message)
if err != nil {
log.Error(err)
} else {
message.Signature = signature
}

byteMsg, _ := json.Marshal(message)
return byteMsg, nil
}

func extractMessageFromJSON(msg string, protocol string) (string, error) {
var msgWithProtocols map[string]string
if err := json.Unmarshal([]byte(msg), &msgWithProtocols); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions app/gosns/gosns_create_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
messageAttributesKey = "MessageAttributes"
)

// TODO - Admiral-Piett - merge these with `publish_test.go`

// When simple message string is passed,
// it must be used for all subscribers (no matter the protocol)
func TestCreateMessageBody_NonJson(t *testing.T) {
Expand All @@ -27,7 +29,7 @@ func TestCreateMessageBody_NonJson(t *testing.T) {
Raw: false,
}

snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureEmpty, make(map[string]app.MessageAttributeValue))
snsMessage, err := createMessageBody(subs, message, subject, messageStructureEmpty, make(map[string]app.MessageAttributeValue))
if err != nil {
t.Fatalf(`error creating SNS message: %s`, err)
}
Expand Down Expand Up @@ -69,7 +71,7 @@ func TestCreateMessageBody_OnlyDefaultValueInJson(t *testing.T) {
message := `{"default": "default message text", "http": "HTTP message text"}`
subject := "subject"

snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureJSON, nil)
snsMessage, err := createMessageBody(subs, message, subject, messageStructureJSON, nil)
if err != nil {
t.Fatalf(`error creating SNS message: %s`, err)
}
Expand Down Expand Up @@ -112,7 +114,7 @@ func TestCreateMessageBody_OnlySqsValueInJson(t *testing.T) {
message := `{"sqs": "message text"}`
subject := "subject"

snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureJSON, nil)
snsMessage, err := createMessageBody(subs, message, subject, messageStructureJSON, nil)
if err == nil {
t.Fatalf(`error expected but instead SNS message was returned: %s`, snsMessage)
}
Expand All @@ -130,7 +132,7 @@ func TestCreateMessageBody_BothDefaultAndSqsValuesInJson(t *testing.T) {
message := `{"default": "default message text", "sqs": "sqs message text"}`
subject := "subject"

snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureJSON, nil)
snsMessage, err := createMessageBody(subs, message, subject, messageStructureJSON, nil)
if err != nil {
t.Fatalf(`error creating SNS message: %s`, err)
}
Expand Down Expand Up @@ -173,7 +175,7 @@ func TestCreateMessageBody_NonJsonContainingJson(t *testing.T) {
message := `{"default": "default message text", "sqs": "sqs message text"}`
subject := "subject"

snsMessage, err := CreateMessageBody(subs, message, subject, "", nil)
snsMessage, err := createMessageBody(subs, message, subject, "", nil)
if err != nil {
t.Fatalf(`error creating SNS message: %s`, err)
}
Expand Down Expand Up @@ -219,7 +221,7 @@ func TestCreateMessageBody_WithMessageAttributes(t *testing.T) {
attributes := map[string]app.MessageAttributeValue{
stringMessageAttributeValue.DataType: stringMessageAttributeValue,
}
snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureEmpty, attributes)
snsMessage, err := createMessageBody(subs, message, subject, messageStructureEmpty, attributes)
if err != nil {
t.Fatalf(`error creating SNS message: %s`, err)
}
Expand Down
Loading
Loading