SIGN IN SIGN UP

Welcome to the AWS Code Examples Repository. This repo contains code examples used in the AWS documentation, AWS SDK Developer Guides, and more. For more information, see the Readme.md file below.

0 0 1 Java
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Unit tests for queue_wrapper.py functions.
"""
import json
import time
import pytest
from botocore.exceptions import ClientError
import queue_wrapper
@pytest.mark.parametrize(
"attributes",
[
({}),
(
{
"MaximumMessageSize": str(4096),
"ReceiveMessageWaitTimeSeconds": str(10),
"VisibilityTimeout": str(300),
}
),
],
)
def test_create_standard_queue(make_stubber, make_unique_name, attributes):
"""Test that creating a standard queue returns a queue with the expected
form of URL."""
sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client)
queue_name = make_unique_name("queue")
url = "url-" + queue_name
sqs_stubber.stub_create_queue(queue_name, attributes, url)
queue = queue_wrapper.create_queue(queue_name, attributes)
assert queue_name in queue.url
if not sqs_stubber.use_stubs:
queue_wrapper.remove_queue(queue)
@pytest.mark.parametrize(
"attributes",
[
({}),
({"MaximumMessageSize": str(1024), "ReceiveMessageWaitTimeSeconds": str(20)}),
],
)
def test_create_fifo_queue(make_stubber, make_unique_name, attributes):
"""Test that creating a FIFO queue returns a queue with the expected form of URL."""
sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client)
# FIFO queues require a '.fifo' suffix on the queue name.
queue_name = make_unique_name("queue") + ".fifo"
attributes["FifoQueue"] = str(True)
url = "url-" + queue_name
sqs_stubber.stub_create_queue(queue_name, attributes, url)
queue = queue_wrapper.create_queue(queue_name, attributes)
assert queue.url.endswith(queue_name)
if not sqs_stubber.use_stubs:
queue_wrapper.remove_queue(queue)
def test_create_dead_letter_queue(make_stubber, make_unique_name):
"""
2020-03-18 14:47:10 -07:00
Test that creating a queue with an associated dead-letter queue results in
the source queue being listed in the dead-letter queue's source queue list.
2020-03-18 14:47:10 -07:00
A dead-letter queue is any queue that is designated as a dead-letter target
by another queue's redrive policy.
"""
sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client)
dl_queue_name = make_unique_name("queue") + "_my_lost_messages"
dl_url = "url-" + dl_queue_name
queue_name = make_unique_name("queue")
url = "url-" + queue_name
2020-04-24 11:19:31 -07:00
# Create a dead-letter queue.
sqs_stubber.stub_create_queue(dl_queue_name, {}, dl_url)
dl_queue = queue_wrapper.create_queue(dl_queue_name)
sqs_stubber.stub_get_queue_attributes(dl_url, "arn:" + dl_queue_name)
# Create the source queue that sends dead-letter messages to the dead-letter queue.
# The redrive policy must be in JSON format. Note that its attribute names
2020-03-18 14:47:10 -07:00
# start with lowercase letters.
redrive_attributes = {
"RedrivePolicy": json.dumps(
{
"deadLetterTargetArn": dl_queue.attributes["QueueArn"],
"maxReceiveCount": str(5),
}
)
}
sqs_stubber.stub_create_queue(queue_name, redrive_attributes, url)
sqs_stubber.stub_list_dead_letter_source_queues(dl_url, [url])
queue = queue_wrapper.create_queue(queue_name, redrive_attributes)
sources = list(dl_queue.dead_letter_source_queues.all())
assert queue in sources
if not sqs_stubber.use_stubs:
queue_wrapper.remove_queue(queue)
queue_wrapper.remove_queue(dl_queue)
def test_create_queue_bad_name():
"""Test that creating a queue with invalid characters in the name raises
an exception."""
with pytest.raises(ClientError):
queue_wrapper.create_queue("Queue names cannot contain spaces!")
def test_create_standard_queue_with_fifo_extension(make_unique_name):
"""Test that creating a standard queue with the '.fifo' extension
raises an exception."""
with pytest.raises(ClientError):
queue_wrapper.create_queue(make_unique_name("queue") + ".fifo")
def test_create_fifo_queue_without_extension(make_unique_name):
"""Test that creating a FIFO queue without the '.fifo' extension
raises an exception."""
with pytest.raises(ClientError):
queue_wrapper.create_queue(
make_unique_name("queue"), attributes={"FifoQueue": str(True)}
)
def test_get_queue(make_stubber, make_unique_name):
2020-03-18 14:47:10 -07:00
"""Test that creating a queue and then getting it by name returns the same queue."""
sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client)
queue_name = make_unique_name("queue")
url = "url-" + queue_name
sqs_stubber.stub_create_queue(queue_name, {}, url)
sqs_stubber.stub_get_queue_url(queue_name, url)
created = queue_wrapper.create_queue(queue_name)
gotten = queue_wrapper.get_queue(queue_name)
assert created == gotten
if not sqs_stubber.use_stubs:
queue_wrapper.remove_queue(created)
def test_get_queue_nonexistent(make_stubber, make_unique_name):
"""Test that getting a nonexistent queue raises an exception."""
sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client)
queue_name = make_unique_name("queue")
url = "url-" + queue_name
sqs_stubber.stub_get_queue_url(
queue_name, url, error_code="AWS.SimpleQueueService.NonExistentQueue"
)
with pytest.raises(ClientError) as exc_info:
queue_wrapper.get_queue(queue_name)
assert (
exc_info.value.response["Error"]["Code"]
== "AWS.SimpleQueueService.NonExistentQueue"
)
def test_get_queues(make_stubber, make_unique_name):
"""
Test that creating some queues, then retrieving all the queues, returns a list
that contains at least all of the newly created queues.
"""
sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client)
queue_name = make_unique_name("queue")
created_queues = []
for ind in range(1, 4):
name = queue_name + str(ind)
sqs_stubber.stub_create_queue(name, {}, "url-" + name)
created_queues.append(queue_wrapper.create_queue(name))
sqs_stubber.stub_list_queues([q.url for q in created_queues])
# When running against the real AWS service, SQS may not return new queues right
# away. Use a simple exponential backoff to retry until it succeeds.
retries = 0
intersect_queues = []
while not intersect_queues and retries <= 5:
time.sleep(min(retries * 2, 32))
gotten_queues = queue_wrapper.get_queues()
intersect_queues = [cq for cq in created_queues if cq in gotten_queues]
retries += 1
assert created_queues == intersect_queues
if not sqs_stubber.use_stubs:
for queue in created_queues:
queue_wrapper.remove_queue(queue)
def test_get_queues_prefix(make_stubber, make_unique_name):
"""
Test that creating some queues with a unique prefix, then retrieving a list of
queues that have that unique prefix, returns a list that contains exactly the
newly created queues.
"""
sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client)
queue_name = make_unique_name("queue")
created_queues = []
for ind in range(1, 4):
name = queue_name + str(ind)
sqs_stubber.stub_create_queue(name, {}, "url-" + name)
created_queues.append(queue_wrapper.create_queue(name))
sqs_stubber.stub_list_queues([q.url for q in created_queues], queue_name)
gotten_queues = queue_wrapper.get_queues(queue_name)
assert created_queues == gotten_queues
if not sqs_stubber.use_stubs:
for queue in created_queues:
queue_wrapper.remove_queue(queue)
def test_get_queues_expect_none(make_stubber, make_unique_name):
"""Test that getting queues with a random prefix returns an empty list
and doesn't raise an exception."""
sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client)
queue_name = make_unique_name("queue")
sqs_stubber.stub_list_queues([], queue_name)
queues = queue_wrapper.get_queues(queue_name)
assert not queues
def test_remove_queue(make_stubber, make_unique_name):
"""Test that creating a queue, deleting it, then trying to get it raises
an exception because the queue no longer exists."""
sqs_stubber = make_stubber(queue_wrapper.sqs.meta.client)
queue_name = make_unique_name("queue")
url = "url-" + queue_name
sqs_stubber.stub_create_queue(queue_name, {}, url)
sqs_stubber.stub_delete_queue(url)
sqs_stubber.stub_get_queue_url(
queue_name, url, error_code="AWS.SimpleQueueService.NonExistentQueue"
)
queue = queue_wrapper.create_queue(queue_name)
queue_wrapper.remove_queue(queue)
with pytest.raises(ClientError) as exc_info:
queue_wrapper.get_queue(queue_name)
assert (
exc_info.value.response["Error"]["Code"]
== "AWS.SimpleQueueService.NonExistentQueue"
)