-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bugfix around message attributes on Publish(SNS), ReceiveMessages(SQS) #339
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package models | ||
|
||
import ( | ||
"encoding/base64" | ||
"encoding/json" | ||
"fmt" | ||
"net/url" | ||
|
@@ -13,8 +14,6 @@ import ( | |
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
var caser = cases.Title(language.AmericanEnglish) | ||
|
||
type CreateQueueRequest struct { | ||
QueueName string `json:"QueueName" schema:"QueueName"` | ||
Attributes QueueAttributes `json:"Attributes" schema:"Attribute"` | ||
|
@@ -205,12 +204,18 @@ func (r *SendMessageRequest) SetAttributesFromForm(values url.Values) { | |
} | ||
|
||
stringValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.StringValue", i)) | ||
binaryValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.BinaryValue", i)) | ||
encodedBinaryValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.BinaryValue", i)) | ||
|
||
binaryValue, err := base64.StdEncoding.DecodeString(encodedBinaryValue) | ||
if err != nil { | ||
log.Warnf("Failed to base64 decode. %s may not have been base64 encoded.", encodedBinaryValue) | ||
continue | ||
} | ||
|
||
r.MessageAttributes[name] = MessageAttribute{ | ||
DataType: dataType, | ||
StringValue: stringValue, | ||
BinaryValue: []byte(binaryValue), | ||
BinaryValue: binaryValue, | ||
} | ||
} | ||
} | ||
|
@@ -257,16 +262,22 @@ func (r *SendMessageBatchRequest) SetAttributesFromForm(values url.Values) { | |
} | ||
|
||
stringValue := values.Get(fmt.Sprintf("Entries.%d.MessageAttributes.%d.Value.StringValue", entryIndex, attributeIndex)) | ||
binaryValue := values.Get(fmt.Sprintf("Entries.%d.MessageAttributes.%d.Value.BinaryValue", entryIndex, attributeIndex)) | ||
encodedBinaryValue := values.Get(fmt.Sprintf("Entries.%d.MessageAttributes.%d.Value.BinaryValue", entryIndex, attributeIndex)) | ||
|
||
if r.Entries[entryIndex].MessageAttributes == nil { | ||
r.Entries[entryIndex].MessageAttributes = make(map[string]MessageAttribute) | ||
} | ||
|
||
binaryValue, err := base64.StdEncoding.DecodeString(encodedBinaryValue) | ||
if err != nil { | ||
log.Warnf("Failed to base64 decode. %s may not have been base64 encoded.", encodedBinaryValue) | ||
continue | ||
} | ||
|
||
r.Entries[entryIndex].MessageAttributes[name] = MessageAttribute{ | ||
DataType: dataType, | ||
StringValue: stringValue, | ||
BinaryValue: []byte(binaryValue), | ||
BinaryValue: binaryValue, | ||
Comment on lines
-260
to
+280
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the same fix for bug 1. |
||
} | ||
|
||
if _, ok := r.Entries[entryIndex].MessageAttributes[name]; !ok { | ||
|
@@ -741,15 +752,21 @@ func (r *PublishRequest) SetAttributesFromForm(values url.Values) { | |
} | ||
|
||
stringValue := values.Get(fmt.Sprintf("MessageAttributes.entry.%d.Value.StringValue", i)) | ||
binaryValue := values.Get(fmt.Sprintf("MessageAttributes.entry.%d.Value.BinaryValue", i)) | ||
encodedBinaryValue := values.Get(fmt.Sprintf("MessageAttributes.entry.%d.Value.BinaryValue", i)) | ||
binaryValue, err := base64.StdEncoding.DecodeString(encodedBinaryValue) | ||
if err != nil { | ||
log.Warnf("Failed to base64 decode. %s may not have been base64 encoded.", encodedBinaryValue) | ||
continue | ||
} | ||
|
||
if r.MessageAttributes == nil { | ||
r.MessageAttributes = make(map[string]MessageAttribute) | ||
} | ||
|
||
attributes[name] = MessageAttribute{ | ||
DataType: caser.String(dataType), // capitalize | ||
DataType: cases.Title(language.AmericanEnglish).String(dataType), // capitalize | ||
Comment on lines
-750
to
+767
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the fix for Bug 2 "DataType value was broken by As I wrote in the description, I don't know the exact conditions under which this occurs, but since there was no need to make it a global variable, I changed it to a local definition and the problem was resolved. If you have any ideas, I would appreciate your comments. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to understand more about why you want to change this. If you don't know the exact conditions does that mean you can't recreate it? If so, what are we fixing, and beyond that how can we create a test that makes sure the issue is fixed? In the short term, I like the pattern of it being global. If I need to uppercase anything else, I want to use the same instance of Can you provide a little more information before we move forward with this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, I thought you said so 😅. It's a little creepy when the behavior isn't clear. Let's dive into the caser code and take a closer look 👍 |
||
StringValue: stringValue, | ||
BinaryValue: []byte(binaryValue), | ||
BinaryValue: binaryValue, | ||
Comment on lines
-744
to
+769
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the same fix for bug 1. |
||
} | ||
} | ||
r.MessageAttributes = attributes | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -314,7 +314,7 @@ func TestSendMessageRequest_SetAttributesFromForm_success(t *testing.T) { | |
attr2 := r.MessageAttributes["Attr2"] | ||
assert.Equal(t, "Binary", attr2.DataType) | ||
assert.Empty(t, attr2.StringValue) | ||
assert.Equal(t, []uint8("VmFsdWUy"), attr2.BinaryValue) | ||
assert.Equal(t, []uint8("Value2"), attr2.BinaryValue) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Internal value should be raw (not base 64 encoded) value. |
||
} | ||
|
||
func TestSetQueueAttributesRequest_SetAttributesFromForm_success(t *testing.T) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package models | ||
|
||
import ( | ||
"encoding/base64" | ||
"encoding/xml" | ||
) | ||
|
||
|
@@ -83,6 +84,9 @@ func (r *ResultMessage) MarshalXML(e *xml.Encoder, start xml.StartElement) error | |
} | ||
var messageAttrs []MessageAttributes | ||
for key, value := range r.MessageAttributes { | ||
if value.DataType == "Binary" { | ||
value.BinaryValue = []byte(base64.StdEncoding.EncodeToString(value.BinaryValue)) | ||
} | ||
Comment on lines
+87
to
+89
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the related fix for Bug 1. In the AWS query protocol, blobs are sent and received after being base64 encoded , so the binary value should be base64 encoded. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know about this. I understand that's part of the query protocol, but that whole thing is deprecated in AWS. Based on my tests this works, as is, for the existing sdks I've tried. What exactly isn't working for you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As you said this part is for the query protocol. The latest version of AWS SDK for Java also works without this part. I thought that future versions of GOAWS still keep support the query protocol, so I added it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, no, that's not the policy per say. I hadn't given it that much thought if I'm honest. But if you think about it like this - this is a tool for folks to run their own apps against instead AWS directly. So my main goal is to make it work in a performant/lightweight way and replicate as much of the AWS SQS/SNS "contract" as I can. So with that in mind - we don't NEED to match what AWS does, as long as we respond the same way that it would. We just need to match their contracts right? At any rate we can't know what they're doing all the times as their docs are generally garbage in my opinion. So, I don't care about things like security, and base64 encoding much, because, it's really up to the user here to worry about their input and output - because it's an app running where a user puts it. "Do what you will with it." so to speak, as long as an SDK doesn't fail on it - "garbage in garbage" out. Here's what I need: Sorry - this is a lot for me to juggle long term and I need a little grooming/refining (in agile terms), before I let it in. Otherwise I'll be adrift if someone else complains about it or something similar later. As always thank you for giving it so much work and thought. I really appreciate it and without you I wouldn't be able to keep this tool as current and viable as it is. |
||
attribute := MessageAttributes{ | ||
Name: key, | ||
Value: value, | ||
|
@@ -95,6 +99,9 @@ func (r *ResultMessage) MarshalXML(e *xml.Encoder, start xml.StartElement) error | |
e.EncodeElement(r.MessageId, xml.StartElement{Name: xml.Name{Local: "MessageId"}}) | ||
e.EncodeElement(r.ReceiptHandle, xml.StartElement{Name: xml.Name{Local: "ReceiptHandle"}}) | ||
e.EncodeElement(r.MD5OfBody, xml.StartElement{Name: xml.Name{Local: "MD5OfBody"}}) | ||
if r.MessageAttributes != nil { | ||
e.EncodeElement(r.MD5OfMessageAttributes, xml.StartElement{Name: xml.Name{Local: "MD5OfMessageAttributes"}}) | ||
} | ||
Comment on lines
+102
to
+104
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the fix for Bug 3 "MD5OfMessageAttributes is missing on the message with message attributes from SQS-ReceiveMessages". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure why I left this out. I think there was a reason though. A lot of this had to do with the raw vs regular message handling. Can you tell me more about what your issues are on this? |
||
e.EncodeElement(r.Body, xml.StartElement{Name: xml.Name{Local: "Body"}}) | ||
e.EncodeElement(attrs, xml.StartElement{Name: xml.Name{Local: "Attribute"}}) | ||
e.EncodeElement(messageAttrs, xml.StartElement{Name: xml.Name{Local: "MessageAttribute"}}) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,7 +63,7 @@ func Test_ResultMessage_MarshalXML_success_with_attributes(t *testing.T) { | |
resultString := string(result) | ||
|
||
// We have to assert piecemeal like this, the maps go into their lists unordered, which will randomly break this. | ||
entry := "<ResultMessage><MessageId>message-id</MessageId><ReceiptHandle>receipt-handle</ReceiptHandle><MD5OfBody>body-md5</MD5OfBody><Body>message-body</Body>" | ||
entry := "<ResultMessage><MessageId>message-id</MessageId><ReceiptHandle>receipt-handle</ReceiptHandle><MD5OfBody>body-md5</MD5OfBody><MD5OfMessageAttributes>message-attrs-md5</MD5OfMessageAttributes><Body>message-body</Body>" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assertion update for Bug fix 3. |
||
assert.Contains(t, resultString, entry) | ||
|
||
entry = "<Attribute><Name>ApproximateFirstReceiveTimestamp</Name><Value>1</Value></Attribute>" | ||
|
@@ -81,7 +81,7 @@ func Test_ResultMessage_MarshalXML_success_with_attributes(t *testing.T) { | |
entry = "<MessageAttribute><Name>attr1</Name><Value><DataType>String</DataType><StringValue>string-value</StringValue></Value></MessageAttribute>" | ||
assert.Contains(t, resultString, entry) | ||
|
||
entry = "<MessageAttribute><Name>attr2</Name><Value><BinaryValue>binary-value</BinaryValue><DataType>Binary</DataType></Value></MessageAttribute>" | ||
entry = "<MessageAttribute><Name>attr2</Name><Value><BinaryValue>YmluYXJ5LXZhbHVl</BinaryValue><DataType>Binary</DataType></Value></MessageAttribute>" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assertion update for blobs are sent and received after being base64 encoded. |
||
assert.Contains(t, resultString, entry) | ||
|
||
entry = "<MessageAttribute><Name>attr3</Name><Value><DataType>Number</DataType><StringValue>number-value</StringValue></Value></MessageAttribute>" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package smoke_tests | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"io" | ||
|
@@ -64,7 +65,92 @@ func Test_Publish_sqs_json_raw(t *testing.T) { | |
assert.Equal(t, message, *receivedMessage.Messages[0].Body) | ||
} | ||
|
||
func Test_Publish_Sqs_With_Message_Attributes(t *testing.T) { | ||
func Test_Publish_sqs_json_with_message_attributes_raw(t *testing.T) { | ||
server := generateServer() | ||
defer func() { | ||
server.Close() | ||
models.ResetResources() | ||
}() | ||
|
||
sdkConfig, _ := config.LoadDefaultConfig(context.TODO()) | ||
sdkConfig.BaseEndpoint = aws.String(server.URL) | ||
sqsClient := sqs.NewFromConfig(sdkConfig) | ||
snsClient := sns.NewFromConfig(sdkConfig) | ||
|
||
createQueueResult, _ := sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{ | ||
QueueName: &af.QueueName, | ||
}) | ||
|
||
topicName := aws.String("unit-topic2") | ||
|
||
createTopicResult, _ := snsClient.CreateTopic(context.TODO(), &sns.CreateTopicInput{ | ||
Name: topicName, | ||
}) | ||
|
||
subscribeResult, _ := snsClient.Subscribe(context.TODO(), &sns.SubscribeInput{ | ||
Protocol: aws.String("sqs"), | ||
TopicArn: createTopicResult.TopicArn, | ||
Attributes: map[string]string{}, | ||
Endpoint: createQueueResult.QueueUrl, | ||
ReturnSubscriptionArn: true, | ||
}) | ||
|
||
snsClient.SetSubscriptionAttributes(context.TODO(), &sns.SetSubscriptionAttributesInput{ | ||
SubscriptionArn: subscribeResult.SubscriptionArn, | ||
AttributeName: aws.String("RawMessageDelivery"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This new test pattern is for "RawMessageDelivery". |
||
AttributeValue: aws.String("true"), | ||
}) | ||
message := "{\"IAm\": \"aMessage\"}" | ||
subject := "I am a subject" | ||
stringKey := "string-key" | ||
binaryKey := "binary-key" | ||
numberKey := "number-key" | ||
stringValue := "string-value" | ||
binaryValue := []byte("binary-value") | ||
numberValue := "100" | ||
attributes := map[string]types.MessageAttributeValue{ | ||
stringKey: { | ||
StringValue: aws.String(stringValue), | ||
DataType: aws.String("String"), | ||
}, | ||
binaryKey: { | ||
BinaryValue: binaryValue, | ||
DataType: aws.String("Binary"), | ||
}, | ||
numberKey: { | ||
StringValue: aws.String(numberValue), | ||
DataType: aws.String("Number"), | ||
}, | ||
} | ||
|
||
publishResponse, publishErr := snsClient.Publish(context.TODO(), &sns.PublishInput{ | ||
TopicArn: createTopicResult.TopicArn, | ||
Message: &message, | ||
Subject: &subject, | ||
MessageAttributes: attributes, | ||
}) | ||
|
||
receiveMessageResponse, receiveErr := sqsClient.ReceiveMessage(context.TODO(), &sqs.ReceiveMessageInput{ | ||
QueueUrl: createQueueResult.QueueUrl, | ||
}) | ||
|
||
assert.Nil(t, publishErr) | ||
assert.NotNil(t, publishResponse) | ||
|
||
assert.Nil(t, receiveErr) | ||
assert.NotNil(t, receiveMessageResponse) | ||
assert.Equal(t, message, *receiveMessageResponse.Messages[0].Body) | ||
|
||
assert.Equal(t, "649b2c548f103e499304eda4d6d4c5a2", *receiveMessageResponse.Messages[0].MD5OfBody) | ||
assert.Equal(t, "ddfbe54b92058bf5b5f00055fa2032a5", *receiveMessageResponse.Messages[0].MD5OfMessageAttributes) | ||
|
||
assert.Equal(t, stringValue, *receiveMessageResponse.Messages[0].MessageAttributes[stringKey].StringValue) | ||
assert.True(t, bytes.Equal(binaryValue, receiveMessageResponse.Messages[0].MessageAttributes[binaryKey].BinaryValue)) | ||
assert.Equal(t, numberValue, *receiveMessageResponse.Messages[0].MessageAttributes[numberKey].StringValue) | ||
|
||
} | ||
|
||
func Test_Publish_sqs_json_with_message_attributes_not_raw(t *testing.T) { | ||
server := generateServer() | ||
defer func() { | ||
server.Close() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -268,7 +268,7 @@ func Test_ReceiveMessageV1_xml_with_attributes(t *testing.T) { | |
assert.Equal(t, 1, len(receiveMessageResponse.Result.Messages)) | ||
assert.Equal(t, "MyTestMessage", receiveMessageResponse.Result.Messages[0].Body) | ||
assert.Equal(t, "ad4883a84ad41c79aa3a373698c0d4e9", receiveMessageResponse.Result.Messages[0].MD5OfBody) | ||
assert.Equal(t, "", receiveMessageResponse.Result.Messages[0].MD5OfMessageAttributes) | ||
assert.Equal(t, "ae8770938aee44bc548cf65ac377e3bf", receiveMessageResponse.Result.Messages[0].MD5OfMessageAttributes) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. xml/json both should have hash for message attribute if the message attributes is not empty. |
||
|
||
entry := "<Attribute><Name>ApproximateFirstReceiveTimestamp</Name><Value>" | ||
assert.Contains(t, response, entry) | ||
|
@@ -288,6 +288,6 @@ func Test_ReceiveMessageV1_xml_with_attributes(t *testing.T) { | |
entry = "<MessageAttribute><Name>attr2</Name><Value><DataType>Number</DataType><StringValue>number-value</StringValue></Value></MessageAttribute>" | ||
assert.Contains(t, response, entry) | ||
|
||
entry = "<MessageAttribute><Name>attr3</Name><Value><BinaryValue>binary-value</BinaryValue><DataType>Binary</DataType></Value></MessageAttribute>" | ||
entry = "<MessageAttribute><Name>attr3</Name><Value><BinaryValue>YmluYXJ5LXZhbHVl</BinaryValue><DataType>Binary</DataType></Value></MessageAttribute>" | ||
assert.Contains(t, response, entry) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package smoke_tests | |
|
||
import ( | ||
"context" | ||
"encoding/base64" | ||
"encoding/xml" | ||
"fmt" | ||
"net/http" | ||
|
@@ -348,7 +349,8 @@ func TestSendMessageBatchV1_Xml_Success_including_attributes(t *testing.T) { | |
stringType := "String" | ||
numberType := "Number" | ||
|
||
binaryValue := "binary-value" | ||
binaryValue := []byte("binary-value") | ||
binaryValueEncodeString := base64.StdEncoding.EncodeToString([]byte("binary-value")) | ||
stringValue := "string-value" | ||
numberValue := "100" | ||
|
||
|
@@ -370,7 +372,7 @@ func TestSendMessageBatchV1_Xml_Success_including_attributes(t *testing.T) { | |
WithFormField("Entries.1.MessageBody", messageBody2). | ||
WithFormField("Entries.1.MessageAttributes.1.Name", binaryAttributeKey). | ||
WithFormField("Entries.1.MessageAttributes.1.Value.DataType", binaryType). | ||
WithFormField("Entries.1.MessageAttributes.1.Value.BinaryValue", binaryValue). | ||
WithFormField("Entries.1.MessageAttributes.1.Value.BinaryValue", binaryValueEncodeString). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is also for blobs are sent and received after being base64 encoded. |
||
WithFormField("Entries.1.MessageAttributes.2.Name", stringAttributeKey). | ||
WithFormField("Entries.1.MessageAttributes.2.Value.DataType", stringType). | ||
WithFormField("Entries.1.MessageAttributes.2.Value.StringValue", stringValue). | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes Bug 1 "Received Message Attribute via XML is not decoded from base 64".
JSON requests has decoded from base 64 on JSON marshaller, but for XML, here
SetAttributesFromForm
does not have decoding step. So internalBinaryValue
was still base 64 string (as binary) value.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really love this. I know it's more true to life, since that input is supposed to be encoded, but I'd prefer to make the decoding optional. It's a testing tool after all, and we're doing our best to cover the needs of all. It's not like we're throwing errors for invalid attributes or anything.
So maybe instead of your continue, the log is fine, but take the value as is and stuff it in the resulting attribute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK! I will update it as the below 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @kojisaikiAtSony - I'm looking at this again, and more of my previous thoughts are coming back to me. Looks to me like this change is decoding it, storing it internally, and then upon receiving the message, encoding it, and returning it. Double check me, is that what you're doing here?
If so, why do this at all? That's why I undid all this decoding/encoding business not long ago. It didn't seem like there was a point. Are you having trouble with the checksum or something because it's encoded? Where are you seeing the "Received Message Attribute via XML is not decoded from base 64" error, and how are you recreating it?