2020-01-13 09:49:45 -08:00
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2020-04-22 17:36:41 -07:00
# SPDX-License-Identifier: Apache-2.0
2020-01-13 09:49:45 -08:00
"""
2020-03-18 14:47:10 -07:00
Purpose
2021-10-22 16:15:10 -07:00
Demonstrate basic message operations in Amazon Simple Queue Service (Amazon SQS).
2020-01-13 09:49:45 -08:00
"""
2021-10-22 16:15:10 -07:00
# snippet-start:[python.example_code.sqs.message_wrapper_imports]
2020-01-13 09:49:45 -08:00
import logging
2020-04-22 17:36:41 -07:00
import sys
2020-01-13 09:49:45 -08:00
import boto3
from botocore . exceptions import ClientError
2020-04-22 17:36:41 -07:00
import queue_wrapper
2020-01-13 09:49:45 -08:00
logger = logging . getLogger ( __name__ )
2023-10-18 10:35:05 -07:00
sqs = boto3 . resource ( " sqs " )
2021-10-22 16:15:10 -07:00
# snippet-end:[python.example_code.sqs.message_wrapper_imports]
2020-01-13 09:49:45 -08:00
2021-10-22 16:15:10 -07:00
# snippet-start:[python.example_code.sqs.SendMessage]
2020-01-13 09:49:45 -08:00
def send_message ( queue , message_body , message_attributes = None ) :
"""
2020-04-24 11:19:31 -07:00
Send a message to an Amazon SQS queue.
2020-01-13 09:49:45 -08:00
:param queue: The queue that receives the message.
:param message_body: The body text of the message.
:param message_attributes: Custom attributes of the message. These are key-value
pairs that can be whatever you want.
:return: The response from SQS that contains the assigned message ID.
"""
if not message_attributes :
message_attributes = { }
try :
response = queue . send_message (
2023-10-18 10:35:05 -07:00
MessageBody = message_body , MessageAttributes = message_attributes
2020-01-13 09:49:45 -08:00
)
except ClientError as error :
logger . exception ( " Send message failed: %s " , message_body )
raise error
else :
return response
2023-10-18 10:35:05 -07:00
2021-10-22 16:15:10 -07:00
# snippet-end:[python.example_code.sqs.SendMessage]
2020-01-13 09:49:45 -08:00
2021-10-22 16:15:10 -07:00
# snippet-start:[python.example_code.sqs.SendMessageBatch]
2020-01-13 09:49:45 -08:00
def send_messages ( queue , messages ) :
"""
Send a batch of messages in a single request to an SQS queue.
This request may return overall success even when some messages were not sent.
The caller must inspect the Successful and Failed lists in the response and
resend any failed messages.
:param queue: The queue to receive the messages.
:param messages: The messages to send to the queue. These are simplified to
contain only the message body and attributes.
:return: The response from SQS that contains the list of successful and failed
messages.
"""
try :
2023-10-18 10:35:05 -07:00
entries = [
{
" Id " : str ( ind ) ,
" MessageBody " : msg [ " body " ] ,
" MessageAttributes " : msg [ " attributes " ] ,
}
for ind , msg in enumerate ( messages )
]
2020-01-13 09:49:45 -08:00
response = queue . send_messages ( Entries = entries )
2023-10-18 10:35:05 -07:00
if " Successful " in response :
for msg_meta in response [ " Successful " ] :
2020-01-13 09:49:45 -08:00
logger . info (
" Message sent: %s : %s " ,
2023-10-18 10:35:05 -07:00
msg_meta [ " MessageId " ] ,
messages [ int ( msg_meta [ " Id " ] ) ] [ " body " ] ,
2020-01-13 09:49:45 -08:00
)
2023-10-18 10:35:05 -07:00
if " Failed " in response :
for msg_meta in response [ " Failed " ] :
2020-01-13 09:49:45 -08:00
logger . warning (
" Failed to send: %s : %s " ,
2023-10-18 10:35:05 -07:00
msg_meta [ " MessageId " ] ,
messages [ int ( msg_meta [ " Id " ] ) ] [ " body " ] ,
2020-01-13 09:49:45 -08:00
)
except ClientError as error :
logger . exception ( " Send messages failed to queue: %s " , queue )
raise error
else :
return response
2023-10-18 10:35:05 -07:00
2021-10-22 16:15:10 -07:00
# snippet-end:[python.example_code.sqs.SendMessageBatch]
2020-01-13 09:49:45 -08:00
2021-10-22 16:15:10 -07:00
# snippet-start:[python.example_code.sqs.ReceiveMessage]
2020-01-13 09:49:45 -08:00
def receive_messages ( queue , max_number , wait_time ) :
"""
Receive a batch of messages in a single request from an SQS queue.
:param queue: The queue from which to receive messages.
:param max_number: The maximum number of messages to receive. The actual number
2020-04-24 11:19:31 -07:00
of messages received might be less.
2020-01-13 09:49:45 -08:00
:param wait_time: The maximum time to wait (in seconds) before returning. When
2020-03-18 14:47:10 -07:00
this number is greater than zero, long polling is used. This
2020-01-13 09:49:45 -08:00
can result in reduced costs and fewer false empty responses.
:return: The list of Message objects received. These each contain the body
2020-03-18 14:47:10 -07:00
of the message and metadata and custom attributes.
2020-01-13 09:49:45 -08:00
"""
try :
messages = queue . receive_messages (
2023-10-18 10:35:05 -07:00
MessageAttributeNames = [ " All " ] ,
2020-01-13 09:49:45 -08:00
MaxNumberOfMessages = max_number ,
2023-10-18 10:35:05 -07:00
WaitTimeSeconds = wait_time ,
2020-01-13 09:49:45 -08:00
)
for msg in messages :
logger . info ( " Received message: %s : %s " , msg . message_id , msg . body )
except ClientError as error :
logger . exception ( " Couldn ' t receive messages from queue: %s " , queue )
raise error
else :
return messages
2023-10-18 10:35:05 -07:00
2021-10-22 16:15:10 -07:00
# snippet-end:[python.example_code.sqs.ReceiveMessage]
2020-01-13 09:49:45 -08:00
2021-10-22 16:15:10 -07:00
# snippet-start:[python.example_code.sqs.DeleteMessage]
2020-01-13 09:49:45 -08:00
def delete_message ( message ) :
"""
Delete a message from a queue. Clients must delete messages after they
2020-03-18 14:47:10 -07:00
are received and processed to remove them from the queue.
2020-01-13 09:49:45 -08:00
:param message: The message to delete. The message ' s queue URL is contained in
the message ' s metadata.
:return: None
"""
try :
message . delete ( )
logger . info ( " Deleted message: %s " , message . message_id )
except ClientError as error :
logger . exception ( " Couldn ' t delete message: %s " , message . message_id )
raise error
2023-10-18 10:35:05 -07:00
2021-10-22 16:15:10 -07:00
# snippet-end:[python.example_code.sqs.DeleteMessage]
2020-01-13 09:49:45 -08:00
2021-10-22 16:15:10 -07:00
# snippet-start:[python.example_code.sqs.DeleteMessageBatch]
2020-01-13 09:49:45 -08:00
def delete_messages ( queue , messages ) :
"""
Delete a batch of messages from a queue in a single request.
:param queue: The queue from which to delete the messages.
:param messages: The list of messages to delete.
:return: The response from SQS that contains the list of successful and failed
2020-04-24 11:19:31 -07:00
message deletions.
2020-01-13 09:49:45 -08:00
"""
try :
2023-10-18 10:35:05 -07:00
entries = [
{ " Id " : str ( ind ) , " ReceiptHandle " : msg . receipt_handle }
for ind , msg in enumerate ( messages )
]
2020-01-13 09:49:45 -08:00
response = queue . delete_messages ( Entries = entries )
2023-10-18 10:35:05 -07:00
if " Successful " in response :
for msg_meta in response [ " Successful " ] :
logger . info ( " Deleted %s " , messages [ int ( msg_meta [ " Id " ] ) ] . receipt_handle )
if " Failed " in response :
for msg_meta in response [ " Failed " ] :
2020-01-13 09:49:45 -08:00
logger . warning (
2023-10-18 10:35:05 -07:00
" Could not delete %s " , messages [ int ( msg_meta [ " Id " ] ) ] . receipt_handle
2020-01-13 09:49:45 -08:00
)
except ClientError :
logger . exception ( " Couldn ' t delete messages from queue %s " , queue )
else :
return response
2023-10-18 10:35:05 -07:00
2021-10-22 16:15:10 -07:00
# snippet-end:[python.example_code.sqs.DeleteMessageBatch]
2020-04-22 17:36:41 -07:00
2021-10-22 16:15:10 -07:00
# snippet-start:[python.example_code.sqs.Scenario_SendReceiveBatch]
2020-04-22 17:36:41 -07:00
def usage_demo ( ) :
"""
2021-10-22 16:15:10 -07:00
Shows how to:
* Read the lines from this Python file and send the lines in
batches of 10 as messages to a queue.
* Receive the messages in batches until the queue is empty.
* Reassemble the lines of the file and verify they match the original file.
2020-04-22 17:36:41 -07:00
"""
2023-10-18 10:35:05 -07:00
2020-04-22 17:36:41 -07:00
def pack_message ( msg_path , msg_body , msg_line ) :
return {
2023-10-18 10:35:05 -07:00
" body " : msg_body ,
" attributes " : {
" path " : { " StringValue " : msg_path , " DataType " : " String " } ,
" line " : { " StringValue " : str ( msg_line ) , " DataType " : " String " } ,
} ,
2020-04-22 17:36:41 -07:00
}
def unpack_message ( msg ) :
2023-10-18 10:35:05 -07:00
return (
msg . message_attributes [ " path " ] [ " StringValue " ] ,
msg . body ,
int ( msg . message_attributes [ " line " ] [ " StringValue " ] ) ,
)
2020-04-22 17:36:41 -07:00
2023-10-18 10:35:05 -07:00
print ( " - " * 88 )
2021-10-22 16:15:10 -07:00
print ( " Welcome to the Amazon Simple Queue Service (Amazon SQS) demo! " )
2023-10-18 10:35:05 -07:00
print ( " - " * 88 )
2021-10-22 16:15:10 -07:00
2023-10-18 10:35:05 -07:00
queue = queue_wrapper . create_queue ( " sqs-usage-demo-message-wrapper " )
2020-04-22 17:36:41 -07:00
with open ( __file__ ) as file :
lines = file . readlines ( )
line = 0
batch_size = 10
2023-10-18 10:35:05 -07:00
received_lines = [ None ] * len ( lines )
2020-04-22 17:36:41 -07:00
print ( f " Sending file lines in batches of { batch_size } as messages. " )
while line < len ( lines ) :
2023-10-18 10:35:05 -07:00
messages = [
pack_message ( __file__ , lines [ index ] , index )
for index in range ( line , min ( line + batch_size , len ( lines ) ) )
]
2020-04-22 17:36:41 -07:00
line = line + batch_size
send_messages ( queue , messages )
2023-10-18 10:35:05 -07:00
print ( " . " , end = " " )
2020-04-22 17:36:41 -07:00
sys . stdout . flush ( )
print ( f " Done. Sent { len ( lines ) - 1 } messages. " )
print ( f " Receiving, handling, and deleting messages in batches of { batch_size } . " )
more_messages = True
while more_messages :
received_messages = receive_messages ( queue , batch_size , 2 )
2023-10-18 10:35:05 -07:00
print ( " . " , end = " " )
2020-04-22 17:36:41 -07:00
sys . stdout . flush ( )
for message in received_messages :
path , body , line = unpack_message ( message )
received_lines [ line ] = body
if received_messages :
delete_messages ( queue , received_messages )
else :
more_messages = False
2023-10-18 10:35:05 -07:00
print ( " Done. " )
2020-04-22 17:36:41 -07:00
if all ( [ lines [ index ] == received_lines [ index ] for index in range ( len ( lines ) ) ] ) :
print ( f " Successfully reassembled all file lines! " )
else :
print ( f " Uh oh, some lines were missed! " )
queue . delete ( )
2021-10-22 16:15:10 -07:00
print ( " Thanks for watching! " )
2023-10-18 10:35:05 -07:00
print ( " - " * 88 )
2021-10-22 16:15:10 -07:00
# snippet-end:[python.example_code.sqs.Scenario_SendReceiveBatch]
2020-04-22 17:36:41 -07:00
2023-10-18 10:35:05 -07:00
if __name__ == " __main__ " :
2021-10-22 16:15:10 -07:00
usage_demo ( )