// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // snippet-start:[sns.java.fifo_topics.create_topic] // Create API clients AWSCredentialsProvider credentials = getCredentials(); AmazonSNS sns = new AmazonSNSClient(credentials); AmazonSQS sqs = new AmazonSQSClient(credentials); // Create FIFO topic Map topicAttributes = new HashMap(); topicAttributes.put("FifoTopic", "true"); topicAttributes.put("ContentBasedDeduplication", "false"); String topicArn = sns.createTopic( new CreateTopicRequest() .withName("PriceUpdatesTopic.fifo") .withAttributes(topicAttributes) ).getTopicArn(); // Create FIFO queues Map queueAttributes = new HashMap(); queueAttributes.put("FifoQueue", "true"); // Disable content-based deduplication because messages published with the same body // might carry different attributes that must be processed independently. // The price management system uses the message attributes to define whether a given // price update applies to the wholesale application or to the retail application. queueAttributes.put("ContentBasedDeduplication", "false"); String wholesaleQueueUrl = sqs.createQueue( new CreateQueueRequest() .withName("WholesaleQueue.fifo") .withAttributes(queueAttributes) ).getQueueUrl(); String retailQueueUrl = sqs.createQueue( new CreateQueueRequest() .withName("RetailQueue.fifo") .withAttributes(queueAttributes) ).getQueueUrl(); // Subscribe FIFO queues to FIFO topic, setting required permissions String wholesaleSubscriptionArn = Topics.subscribeQueue(sns, sqs, topicArn, wholesaleQueueUrl); String retailSubscriptionArn = Topics.subscribeQueue(sns, sqs, topicArn, retailQueueUrl); // snippet-end:[sns.java.fifo_topics.create_topic] // snippet-start:[sns.java.fifo_topics.filter_policy] // Set the Amazon SNS subscription filter policies SNSMessageFilterPolicy wholesalePolicy = new SNSMessageFilterPolicy(); wholesalePolicy.addAttribute("business", "wholesale"); wholesalePolicy.apply(sns, wholesaleSubscriptionArn); SNSMessageFilterPolicy retailPolicy = new SNSMessageFilterPolicy(); retailPolicy.addAttribute("business", "retail"); retailPolicy.apply(sns, retailSubscriptionArn); // snippet-end:[sns.java.fifo_topics.filter_policy] // snippet-start:[sns.java.fifo_topics.publish] // Publish message to FIFO topic String subject = "Price Update"; String payload = "{\"product\": 214, \"price\": 79.99}"; String groupId = "PID-214"; String dedupId = UUID.randomUUID().toString(); String attributeName = "business"; String attributeValue = "wholesale"; Map attributes = new HashMap<>(); attributes.put( attributeName, new MessageAttributeValue() .withDataType("String") .withStringValue(attributeValue)); sns.publish( new PublishRequest() .withTopicArn(topicArn) .withSubject(subject) .withMessage(payload) .withMessageGroupId(groupId); .withMessageDeduplicationId(dedupId) .withMessageAttributes(attributes); // snippet-end:[sns.java.fifo_topics.publish]