-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage_wrapper.py
249 lines (216 loc) · 8.87 KB
/
message_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Purpose
Demonstrate basic message operations in Amazon Simple Queue Service (Amazon SQS).
"""
# snippet-start:[python.example_code.sqs.message_wrapper_imports]
import logging
import sys
import boto3
from botocore.exceptions import ClientError
import queue_wrapper
from os import environ
logger = logging.getLogger(__name__)
sqs = boto3.resource('sqs',aws_access_key_id=environ['SQS_AWS_ACCESS_KEY_ID'],
aws_secret_access_key=environ['SQS_AWS_SECRET_ACCESS_KEY'],
region_name=environ['SQS_REGION'])
# snippet-end:[python.example_code.sqs.message_wrapper_imports]
# snippet-start:[python.example_code.sqs.SendMessage]
def send_message(queue, message_body, message_attributes=None):
"""
Send a message to an Amazon SQS queue.
:param queue: The queue that receives the message.
:param message_body: The body text of the message.
:param message_attributes: Custom attributes of the message. These are key-value
pairs that can be whatever you want.
:return: The response from SQS that contains the assigned message ID.
"""
if not message_attributes:
message_attributes = {}
try:
response = queue.send_message(
MessageBody=message_body,
MessageAttributes=message_attributes
)
except ClientError as error:
logger.exception("Send message failed: %s", message_body)
raise error
else:
return response
# snippet-end:[python.example_code.sqs.SendMessage]
# snippet-start:[python.example_code.sqs.SendMessageBatch]
def send_messages(queue, messages):
"""
Send a batch of messages in a single request to an SQS queue.
This request may return overall success even when some messages were not sent.
The caller must inspect the Successful and Failed lists in the response and
resend any failed messages.
:param queue: The queue to receive the messages.
:param messages: The messages to send to the queue. These are simplified to
contain only the message body and attributes.
:return: The response from SQS that contains the list of successful and failed
messages.
"""
try:
entries = [{
'Id': str(ind),
'MessageBody': msg['body'],
'MessageAttributes': msg['attributes']
} for ind, msg in enumerate(messages)]
response = queue.send_messages(Entries=entries)
if 'Successful' in response:
for msg_meta in response['Successful']:
logger.info(
"Message sent: %s: %s",
msg_meta['MessageId'],
messages[int(msg_meta['Id'])]['body']
)
if 'Failed' in response:
for msg_meta in response['Failed']:
logger.warning(
"Failed to send: %s: %s",
msg_meta['MessageId'],
messages[int(msg_meta['Id'])]['body']
)
except ClientError as error:
logger.exception("Send messages failed to queue: %s", queue)
raise error
else:
return response
# snippet-end:[python.example_code.sqs.SendMessageBatch]
# snippet-start:[python.example_code.sqs.ReceiveMessage]
def receive_messages(queue, max_number, wait_time):
"""
Receive a batch of messages in a single request from an SQS queue.
:param queue: The queue from which to receive messages.
:param max_number: The maximum number of messages to receive. The actual number
of messages received might be less.
:param wait_time: The maximum time to wait (in seconds) before returning. When
this number is greater than zero, long polling is used. This
can result in reduced costs and fewer false empty responses.
:return: The list of Message objects received. These each contain the body
of the message and metadata and custom attributes.
"""
try:
messages = queue.receive_messages(
MessageAttributeNames=['All'],
MaxNumberOfMessages=max_number,
WaitTimeSeconds=wait_time
)
for msg in messages:
logger.info("Received message: %s: %s", msg.message_id, msg.body)
except ClientError as error:
logger.exception("Couldn't receive messages from queue: %s", queue)
raise error
else:
return messages
# snippet-end:[python.example_code.sqs.ReceiveMessage]
# snippet-start:[python.example_code.sqs.DeleteMessage]
def delete_message(message):
"""
Delete a message from a queue. Clients must delete messages after they
are received and processed to remove them from the queue.
:param message: The message to delete. The message's queue URL is contained in
the message's metadata.
:return: None
"""
try:
message.delete()
logger.info("Deleted message: %s", message.message_id)
except ClientError as error:
logger.exception("Couldn't delete message: %s", message.message_id)
raise error
# snippet-end:[python.example_code.sqs.DeleteMessage]
# snippet-start:[python.example_code.sqs.DeleteMessageBatch]
def delete_messages(queue, messages):
"""
Delete a batch of messages from a queue in a single request.
:param queue: The queue from which to delete the messages.
:param messages: The list of messages to delete.
:return: The response from SQS that contains the list of successful and failed
message deletions.
"""
try:
entries = [{
'Id': str(ind),
'ReceiptHandle': msg.receipt_handle
} for ind, msg in enumerate(messages)]
response = queue.delete_messages(Entries=entries)
if 'Successful' in response:
for msg_meta in response['Successful']:
logger.info("Deleted %s", messages[int(msg_meta['Id'])].receipt_handle)
if 'Failed' in response:
for msg_meta in response['Failed']:
logger.warning(
"Could not delete %s",
messages[int(msg_meta['Id'])].receipt_handle
)
except ClientError:
logger.exception("Couldn't delete messages from queue %s", queue)
else:
return response
# snippet-end:[python.example_code.sqs.DeleteMessageBatch]
# snippet-start:[python.example_code.sqs.Scenario_SendReceiveBatch]
def usage_demo():
"""
Shows how to:
* Read the lines from this Python file and send the lines in
batches of 10 as messages to a queue.
* Receive the messages in batches until the queue is empty.
* Reassemble the lines of the file and verify they match the original file.
"""
def pack_message(msg_path, msg_body, msg_line):
return {
'body': msg_body,
'attributes': {
'path': {'StringValue': msg_path, 'DataType': 'String'},
'line': {'StringValue': str(msg_line), 'DataType': 'String'}
}
}
def unpack_message(msg):
return (msg.message_attributes['path']['StringValue'],
msg.body,
int(msg.message_attributes['line']['StringValue']))
print('-'*88)
print("Welcome to the Amazon Simple Queue Service (Amazon SQS) demo!")
print('-'*88)
queue = queue_wrapper.create_queue('sqs-usage-demo-message-wrapper')
with open(__file__) as file:
lines = file.readlines()
line = 0
batch_size = 10
received_lines = [None]*len(lines)
print(f"Sending file lines in batches of {batch_size} as messages.")
while line < len(lines):
messages = [pack_message(__file__, lines[index], index)
for index in range(line, min(line + batch_size, len(lines)))]
line = line + batch_size
send_messages(queue, messages)
print('.', end='')
sys.stdout.flush()
print(f"Done. Sent {len(lines) - 1} messages.")
print(f"Receiving, handling, and deleting messages in batches of {batch_size}.")
more_messages = True
while more_messages:
received_messages = receive_messages(queue, batch_size, 2)
print('.', end='')
sys.stdout.flush()
for message in received_messages:
path, body, line = unpack_message(message)
received_lines[line] = body
if received_messages:
delete_messages(queue, received_messages)
else:
more_messages = False
print('Done.')
if all([lines[index] == received_lines[index] for index in range(len(lines))]):
print(f"Successfully reassembled all file lines!")
else:
print(f"Uh oh, some lines were missed!")
queue.delete()
print("Thanks for watching!")
print('-'*88)
# snippet-end:[python.example_code.sqs.Scenario_SendReceiveBatch]
if __name__ == '__main__':
usage_demo()