Today we are going to be watching how to use the service AWS-SQS, it is an amazing and easy tool to use, in my case I just used it to synchronize some desktop applications which need to be updated each time a new user is created or deleted, but I need this to be done in the FIFO order, so instead of using rabbit-mq-server I used the AWS SQS service which is really to set it up.
Amazon Simple Queue Service (SQS) is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications.
SQS offers two types of message queues. Standard queues offer maximum throughput, best-effort ordering, and at-least-once delivery. SQS FIFO queues are designed to guarantee that messages are processed exactly once, in the exact order that they are sent.
the desktop application was doing a request every 15 seconds asking the API for new data to be synchronized, but in an hour would be 240 requests, in a day would be 5.760 requests that just by one device, now doing the same calculation by 8 devices would be 1.428.480 requests per month, it would be a heavy-duty for a little instance running the API, so in my case using a service like SQS was a nice solution so that having about 1.500.000 requests per month is about a dollar. a cheaper solution than running an instance running just alone rabbit-mq-server.
anyway let's start doing the application which sends data to the SQS FIFO, and the application which gest the information from SQS FIFO
the source code which sends the data to the FIFO is in a SpringBoot API, and the code which gets the info from the FIFO is in Maven java application project.
Let's go to the SQS services in the AWS console and let's create
Configure the Queue and set the Default Visibility Timeout to 15 seconds and enable Content-Based Deduplication
the FIFO was already created
the examples were done using the AWS java code
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-getting-started-java.html
SpringBoot controller to send a JsonArray and get items from the FIFO
package com.awssqssender.AwsSenderSQS.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.stereotype.Controller;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import java.util.List;
import org.json.JSONArray;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.*;
import com.awssqssender.AwsSenderSQS.Controller.Model.TestData;
import java.util.HashMap;
import java.util.Map;
@Controller
@RequestMapping("api")
public class MainController {
@GetMapping("getfromfifo")
public ResponseEntity<String> getTotalhoy() {
return new ResponseEntity<String>(getMessages(), HttpStatus.OK);
}
public String getMessages() {
String returnSTRING = "";
String accessKey = "AKIAR4GJOBAHA3TEAU";
String secretKey = "4HFrqCVwKGcJtaq3p/OkVSKehcht36NL6jOnSZ";
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
final AmazonSQS sqs = new AmazonSQSClient(credentials);
System.out.println("===============================================");
System.out.println("Getting Started with Amazon SQS Standard Queues");
System.out.println("===============================================\n");
try {
final Map<String, String> attributes = new HashMap<>();
attributes.put("FifoQueue", "true");
attributes.put("ContentBasedDeduplication", "true");
// The FIFO queue name must end with the .fifo suffix.
final CreateQueueRequest createQueueRequest = new CreateQueueRequest("EXAMPLE.fifo")
.withAttributes(attributes);
final String myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();
System.out.println("Receiving messages from F1.fifo.\n");
final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
final List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (final Message message : messages) {
System.out.println("Message");
System.out.println(" MessageId: " + message.getMessageId());
System.out.println(" ReceiptHandle: " + message.getReceiptHandle());
System.out.println(" MD5OfBody: " + message.getMD5OfBody());
System.out.println(" Body: " + message.getBody());
returnSTRING = message.getBody();
}
System.out.println();
if (messages.size() > 0) {
System.out.println("Deleting the message.\n");
final String messageReceiptHandle = messages.get(0).getReceiptHandle();
sqs.deleteMessage(new DeleteMessageRequest(myQueueUrl, messageReceiptHandle));
}
} catch (final AmazonServiceException ase) {
System.out.println(
"Caught an AmazonServiceException, which means " + "your request made it to Amazon SQS, but was "
+ "rejected with an error response for some reason.");
System.out.println("Error Message: " + ase.getMessage());
System.out.println("HTTP Status Code: " + ase.getStatusCode());
System.out.println("AWS Error Code: " + ase.getErrorCode());
System.out.println("Error Type: " + ase.getErrorType());
System.out.println("Request ID: " + ase.getRequestId());
} catch (final AmazonClientException ace) {
System.out.println("Caught an AmazonClientException, which means "
+ "the client encountered a serious internal problem while "
+ "trying to communicate with Amazon SQS, such as not " + "being able to access the network.");
System.out.println("Error Message: " + ace.getMessage());
}
return returnSTRING;
}
@PostMapping("/sendtofifo")
public ResponseEntity<String> sendSQS(@RequestBody List<TestData> test) {
JSONArray jsArray = new JSONArray(test);
sendingSQS(jsArray.toString());
return new ResponseEntity<String>("Sending SQS", HttpStatus.OK);
}
public void sendingSQS(String message) {
String accessKey = "AKIAR4GJOBAHA3TEAH";
String secretKey = "4HFrqCVwKGcJtaq3p/OkVSKehcht36NL6jOnSZ";
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
final AmazonSQS sqs = new AmazonSQSClient(credentials);
try {
final Map<String, String> attributes = new HashMap<>();
// A FIFO queue must have the FifoQueue attribute set to true.
attributes.put("FifoQueue", "true");
/*
* If the user doesn't provide a MessageDeduplicationId, generate a
* MessageDeduplicationId based on the content.
*/
attributes.put("ContentBasedDeduplication", "true");
// The FIFO queue name must end with the .fifo suffix.
final CreateQueueRequest createQueueRequest = new CreateQueueRequest("EXAMPLE.fifo")
.withAttributes(attributes);
final String myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();
// List all queues.
/*
* System.out.println("Listing all queues in your account.\n"); for (final
* String queueUrl : sqs.listQueues().getQueueUrls()) {
* System.out.println(" QueueUrl: " + queueUrl); } System.out.println();
*/
// Send a message.
System.out.println("Sending a message to F1.fifo.\n");
final SendMessageRequest sendMessageRequest = new SendMessageRequest(myQueueUrl, message);
/*
* When you send messages to a FIFO queue, you must provide a non-empty
* MessageGroupId.
*/
sendMessageRequest.setMessageGroupId("messageGroup1");
// Uncomment the following to provide the MessageDeduplicationId
// sendMessageRequest.setMessageDeduplicationId("1");
final SendMessageResult sendMessageResult = sqs.sendMessage(sendMessageRequest);
final String sequenceNumber = sendMessageResult.getSequenceNumber();
final String messageId = sendMessageResult.getMessageId();
System.out.println(
"SendMessage succeed with messageId " + messageId + ", sequence number " + sequenceNumber + "\n");
} catch (final AmazonServiceException ase) {
System.out.println(
"Caught an AmazonServiceException, which means " + "your request made it to Amazon SQS, but was "
+ "rejected with an error response for some reason.");
System.out.println("Error Message: " + ase.getMessage());
System.out.println("HTTP Status Code: " + ase.getStatusCode());
System.out.println("AWS Error Code: " + ase.getErrorCode());
System.out.println("Error Type: " + ase.getErrorType());
System.out.println("Request ID: " + ase.getRequestId());
} catch (final AmazonClientException ace) {
System.out.println("Caught an AmazonClientException, which means "
+ "the client encountered a serious internal problem while "
+ "trying to communicate with Amazon SQS, such as not " + "being able to access the network.");
System.out.println("Error Message: " + ace.getMessage());
}
}
}
we are going to test it with Postman, let's first try the send method
Let's check it, there is a message available on the queue, ready to be gotten from there
Let's see what it has got inside
there was success sending the json string as a message
let's now try the get method
now let's see again the queue fifo, and there is any message any longer so that it was gotten and deleted at the same time
keep in mind that if you send the same message it does not support duplicated messages if for any reason you need to send a repeated message you need to set something which makes it different from the other message despite they are the same inside, and a way to do it would be sending another attribute like a date
despite it is the same record, we can see the same the record was uploaded twice because the date attribute is the one that makes the difference.
Replace accessKey and secretKey and your .fifo name by yours.
if you want to get IAM credentials, go to services -> IAM -> USERS -> ADD USER give it a name and check Accecss Type: Programatic Access
Click on next and select Attach existing policies directly and check your policy according to your needs, in this case, I am checking AmazonSQSFullAccess
and after that create the user and get the access key and the secret key
Comments
Post a Comment