Skip to content

Commit 5fdfff2

Browse files
dhumphreys01Admiral-Piett
authored andcommitted
Refactor MessageAttributes
1 parent df99a4b commit 5fdfff2

25 files changed

+631
-745
lines changed

app/gosns/gosns.go

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ func getSubscription(subsArn string) *models.Subscription {
228228
}
229229

230230
func createMessageBody(subs *models.Subscription, entry interfaces.AbstractPublishEntry,
231-
messageAttributes map[string]models.SqsMessageAttributeValue) ([]byte, error) {
231+
messageAttributes map[string]models.MessageAttribute) (string, error) {
232232

233233
msgId := uuid.NewString()
234234
message := models.SNSMessage{
@@ -240,13 +240,13 @@ func createMessageBody(subs *models.Subscription, entry interfaces.AbstractPubli
240240
SignatureVersion: "1",
241241
SigningCertURL: fmt.Sprintf("http://%s:%s/SimpleNotificationService/%s.pem", models.CurrentEnvironment.Host, models.CurrentEnvironment.Port, msgId),
242242
UnsubscribeURL: fmt.Sprintf("http://%s:%s/?Action=Unsubscribe&SubscriptionArn=%s", models.CurrentEnvironment.Host, models.CurrentEnvironment.Port, subs.SubscriptionArn),
243-
MessageAttributes: formatAttributes(messageAttributes),
243+
MessageAttributes: messageAttributes,
244244
}
245245

246246
if models.MessageStructure(entry.GetMessageStructure()) == models.MessageStructureJSON {
247247
m, err := extractMessageFromJSON(entry.GetMessage(), subs.Protocol)
248248
if err != nil {
249-
return nil, err
249+
return "", err
250250
}
251251
message.Message = m
252252
} else {
@@ -261,29 +261,10 @@ func createMessageBody(subs *models.Subscription, entry interfaces.AbstractPubli
261261
}
262262

263263
byteMsg, _ := json.Marshal(message)
264-
return byteMsg, nil
265-
}
266-
267-
func formatAttributes(values map[string]models.SqsMessageAttributeValue) map[string]models.MessageAttributeValue {
268-
attr := make(map[string]models.MessageAttributeValue)
269-
for k, v := range values {
270-
if v.DataType == "String" {
271-
attr[k] = models.MessageAttributeValue{
272-
DataType: v.DataType,
273-
StringValue: v.Value,
274-
}
275-
} else {
276-
attr[k] = models.MessageAttributeValue{
277-
DataType: v.DataType,
278-
BinaryValue: v.Value, // TODO - this may need to be a []byte?
279-
}
280-
}
281-
}
282-
return attr
264+
return string(byteMsg), nil
283265
}
284266

285267
func publishHTTP(subs *models.Subscription, topicArn string, entry interfaces.AbstractPublishEntry) {
286-
messageAttributes := utils.ConvertToOldMessageAttributeValueStructure(entry.GetMessageAttributes())
287268
id := uuid.NewString()
288269
msg := models.SNSMessage{
289270
Type: "Notification",
@@ -295,7 +276,7 @@ func publishHTTP(subs *models.Subscription, topicArn string, entry interfaces.Ab
295276
SignatureVersion: "1",
296277
SigningCertURL: fmt.Sprintf("http://%s:%s/SimpleNotificationService/%s.pem", models.CurrentEnvironment.Host, models.CurrentEnvironment.Port, id),
297278
UnsubscribeURL: fmt.Sprintf("http://%s:%s/?Action=Unsubscribe&SubscriptionArn=%s", models.CurrentEnvironment.Host, models.CurrentEnvironment.Port, subs.SubscriptionArn),
298-
MessageAttributes: formatAttributes(messageAttributes),
279+
MessageAttributes: entry.GetMessageAttributes(),
299280
}
300281

301282
signature, err := signMessage(PrivateKEY, &msg)
@@ -318,8 +299,7 @@ func publishHTTP(subs *models.Subscription, topicArn string, entry interfaces.Ab
318299
// put it in the resulting `body`, so that's all that's in that field when the message is received. If it's not
319300
// raw, then we put all this other junk in there too, similar to how AWS stores its metadata in there.
320301
func publishSQS(subscription *models.Subscription, topic *models.Topic, entry interfaces.AbstractPublishEntry) error {
321-
messageAttributes := utils.ConvertToOldMessageAttributeValueStructure(entry.GetMessageAttributes())
322-
if subscription.FilterPolicy != nil && !subscription.FilterPolicy.IsSatisfiedBy(messageAttributes) {
302+
if subscription.FilterPolicy != nil && !subscription.FilterPolicy.IsSatisfiedBy(entry.GetMessageAttributes()) {
323303
return nil
324304
}
325305

@@ -333,8 +313,8 @@ func publishSQS(subscription *models.Subscription, topic *models.Topic, entry in
333313
msg := models.SqsMessage{}
334314

335315
if subscription.Raw {
336-
msg.MessageAttributes = messageAttributes
337-
msg.MD5OfMessageAttributes = utils.HashAttributes(messageAttributes)
316+
msg.MessageAttributes = entry.GetMessageAttributes()
317+
msg.MD5OfMessageAttributes = utils.HashAttributes(entry.GetMessageAttributes())
338318

339319
// NOTE: Admiral-Piett - commenting this out. I don't understand what this is supposed to achieve
340320
// for raw message delivery. I suspect this doesn't work at all, otherwise you'd have to match the
@@ -346,9 +326,9 @@ func publishSQS(subscription *models.Subscription, topic *models.Topic, entry in
346326
//} else {
347327
// msg.MessageBody = []byte(entry.GetMessage())
348328
//}
349-
msg.MessageBody = []byte(entry.GetMessage())
329+
msg.MessageBody = entry.GetMessage()
350330
} else {
351-
m, err := createMessageBody(subscription, entry, messageAttributes)
331+
m, err := createMessageBody(subscription, entry, entry.GetMessageAttributes())
352332
if err != nil {
353333
return err
354334
}

app/gosns/gosns_test.go

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ func Test_publishSQS_filter_policy_not_satisfied_by_attributes(t *testing.T) {
9494
request := models.PublishRequest{
9595
TopicArn: topicArn,
9696
Message: message,
97-
MessageAttributes: map[string]models.MessageAttributeValue{
98-
"invalid": models.MessageAttributeValue{
97+
MessageAttributes: map[string]models.MessageAttribute{
98+
"invalid": models.MessageAttribute{
9999
DataType: "String",
100100
StringValue: "garbage",
101101
},
@@ -199,11 +199,11 @@ func TestCreateMessageBody_success_NoMessageAttributes(t *testing.T) {
199199
Subject: subject,
200200
}
201201

202-
result, err := createMessageBody(subs, msg, map[string]models.SqsMessageAttributeValue{})
202+
result, err := createMessageBody(subs, msg, map[string]models.MessageAttribute{})
203203
assert.Nil(t, err)
204204

205205
unmarshalledMessage := &models.SNSMessage{}
206-
json.Unmarshal(result, unmarshalledMessage)
206+
json.Unmarshal([]byte(result), unmarshalledMessage)
207207

208208
assert.Equal(t, "Notification", unmarshalledMessage.Type)
209209
assert.Equal(t, "", unmarshalledMessage.Token)
@@ -225,11 +225,10 @@ func TestCreateMessageBody_success_WithMessageAttributes(t *testing.T) {
225225
SubscriptionArn: "subs-arn",
226226
Raw: false,
227227
}
228-
attributes := map[string]models.SqsMessageAttributeValue{
228+
attributes := map[string]models.MessageAttribute{
229229
"test": {
230-
DataType: "String",
231-
ValueKey: "StringValue",
232-
Value: "test",
230+
DataType: "String",
231+
StringValue: "test",
233232
},
234233
}
235234

@@ -280,8 +279,8 @@ func TestCreateMessageBody_JSONMessageStructure_MissingDefaultKey(t *testing.T)
280279

281280
snsMessage, err := createMessageBody(subs, msg, nil)
282281

282+
assert.Equal(t, "", snsMessage)
283283
assert.Error(t, err)
284-
assert.Nil(t, snsMessage)
285284
}
286285

287286
func TestCreateMessageBody_JSONMessageStructure_SelectsProtocolSpecificMessageIfAvailable(t *testing.T) {
@@ -324,29 +323,6 @@ func TestCreateMessageBody_NonJsonMessageStructure_MessageContainingJson(t *test
324323
assert.Contains(t, string(snsMessage), "\"Message\":\"{\\\"default\\\": \\\"default message text\\\", \\\"sqs\\\": \\\"sqs message text\\\"}\"")
325324
}
326325

327-
func Test_formatAttributes_success(t *testing.T) {
328-
attrs := map[string]models.SqsMessageAttributeValue{
329-
"test1": models.SqsMessageAttributeValue{
330-
Name: "MyAttr",
331-
DataType: "String",
332-
Value: "value1",
333-
},
334-
"test2": models.SqsMessageAttributeValue{
335-
Name: "MyAttr",
336-
DataType: "String",
337-
Value: "value2",
338-
},
339-
}
340-
expected := map[string]models.MessageAttributeValue{
341-
"test1": {DataType: "String", StringValue: "value1"},
342-
"test2": {DataType: "String", StringValue: "value2"},
343-
}
344-
345-
result := formatAttributes(attrs)
346-
347-
assert.Equal(t, expected, result)
348-
}
349-
350326
func Test_publishMessageByTopic_sqs_success(t *testing.T) {
351327
defer func() {
352328
publishSqsMessageFunc = publishSQS

app/gosqs/change_message_visibility_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func TestChangeMessageVisibility_POST_SUCCESS(t *testing.T) {
2121
q := &models.Queue{
2222
Name: "testing",
2323
Messages: []models.SqsMessage{{
24-
MessageBody: []byte("test1"),
24+
MessageBody: "test1",
2525
ReceiptHandle: "123",
2626
}},
2727
}

app/gosqs/delete_message_batch_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@ func TestDeleteMessageBatchV1_success_all_message(t *testing.T) {
2626
Name: "testing",
2727
Messages: []models.SqsMessage{
2828
{
29-
MessageBody: []byte("test%20message%20body%201"),
29+
MessageBody: "test%20message%20body%201",
3030
ReceiptHandle: "test1",
3131
},
3232
{
33-
MessageBody: []byte("test%20message%20body%202"),
33+
MessageBody: "test%20message%20body%202",
3434
ReceiptHandle: "test2",
3535
},
3636
{
37-
MessageBody: []byte("test%20message%20body%203"),
37+
MessageBody: "test%20message%20body%203",
3838
ReceiptHandle: "test3",
3939
},
4040
},
@@ -89,11 +89,11 @@ func TestDeleteMessageBatchV1_success_not_found_message(t *testing.T) {
8989
Name: "testing",
9090
Messages: []models.SqsMessage{
9191
{
92-
MessageBody: []byte("test%20message%20body%201"),
92+
MessageBody: "test%20message%20body%201",
9393
ReceiptHandle: "test1",
9494
},
9595
{
96-
MessageBody: []byte("test%20message%20body%203"),
96+
MessageBody: "test%20message%20body%203",
9797
ReceiptHandle: "test3",
9898
},
9999
},

app/gosqs/delete_message_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func TestDeleteMessage(t *testing.T) {
2020
q := &models.Queue{
2121
Name: "testing",
2222
Messages: []models.SqsMessage{{
23-
MessageBody: []byte("test1"),
23+
MessageBody: "test1",
2424
ReceiptHandle: "123",
2525
}},
2626
}

app/gosqs/message_attributes.go

Lines changed: 0 additions & 61 deletions
This file was deleted.

app/gosqs/receive_message.go

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func ReceiveMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody)
119119
models.SyncQueues.Queues[queueName].LockGroup(msg.GroupID)
120120
}
121121

122-
messages = append(messages, getMessageResult(msg))
122+
messages = append(messages, buildResultMessage(msg))
123123

124124
numMsg++
125125
}
@@ -141,34 +141,19 @@ func ReceiveMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody)
141141
return http.StatusOK, respStruct
142142
}
143143

144-
func getMessageResult(m *models.SqsMessage) *models.ResultMessage {
145-
msgMttrs := []*models.ResultMessageAttribute{}
146-
for _, attr := range m.MessageAttributes {
147-
msgMttrs = append(msgMttrs, getMessageAttributeResult(&attr))
148-
}
149-
150-
attrsMap := map[string]string{
151-
"ApproximateFirstReceiveTimestamp": fmt.Sprintf("%d", m.ReceiptTime.UnixNano()/int64(time.Millisecond)),
152-
"SenderId": models.CurrentEnvironment.AccountID,
153-
"ApproximateReceiveCount": fmt.Sprintf("%d", m.NumberOfReceives+1),
154-
"SentTimestamp": fmt.Sprintf("%d", time.Now().UTC().UnixNano()/int64(time.Millisecond)),
155-
}
156-
157-
var attrs []*models.ResultAttribute
158-
for k, v := range attrsMap {
159-
attrs = append(attrs, &models.ResultAttribute{
160-
Name: k,
161-
Value: v,
162-
})
163-
}
164-
144+
func buildResultMessage(m *models.SqsMessage) *models.ResultMessage {
165145
return &models.ResultMessage{
166146
MessageId: m.Uuid,
167147
Body: m.MessageBody,
168148
ReceiptHandle: m.ReceiptHandle,
169-
MD5OfBody: utils.GetMD5Hash(string(m.MessageBody)),
149+
MD5OfBody: utils.GetMD5Hash(m.MessageBody),
170150
MD5OfMessageAttributes: m.MD5OfMessageAttributes,
171-
MessageAttributes: msgMttrs,
172-
Attributes: attrs,
151+
MessageAttributes: m.MessageAttributes,
152+
Attributes: map[string]string{
153+
"ApproximateFirstReceiveTimestamp": fmt.Sprintf("%d", m.ReceiptTime.UnixNano()/int64(time.Millisecond)),
154+
"SenderId": models.CurrentEnvironment.AccountID,
155+
"ApproximateReceiveCount": fmt.Sprintf("%d", m.NumberOfReceives+1),
156+
"SentTimestamp": fmt.Sprintf("%d", time.Now().UTC().UnixNano()/int64(time.Millisecond)),
157+
},
173158
}
174159
}

0 commit comments

Comments
 (0)