Skip to main content

Using AWS - SQS FIFO - SpringBoot Api sender / JavaApplication receiver

Today we are going to be watching how to use the service AWS-SQS, it is an amazing and easy tool to use, in my case I just used it to synchronize some desktop applications which need to be updated each time a new user is created or deleted, but I need this to be done in the FIFO order, so instead of using rabbit-mq-server I used the AWS SQS service which is really to set it up.

Amazon Simple Queue Service (SQS) is a fully managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications.

SQS offers two types of message queues. Standard queues offer maximum throughput, best-effort ordering, and at-least-once delivery. SQS FIFO queues are designed to guarantee that messages are processed exactly once, in the exact order that they are sent.

the desktop application was doing a request every 15 seconds asking the API for new data to be synchronized, but in an hour would be 240 requests, in a day would be 5.760 requests that just by one device, now doing the same calculation by 8 devices would be 1.428.480 requests per month, it would be a heavy-duty for a little instance running the API, so in my case using a service like SQS was a nice solution so that having about 1.500.000 requests per month is about a dollar. a cheaper solution than running an instance running just alone rabbit-mq-server.

anyway let's start doing the application which sends data to the SQS FIFO, and the application which gest the information from SQS FIFO

the source code which sends the data to the FIFO is in  a SpringBoot API, and the code which gets the info from the FIFO is in Maven java application project.

Let's go to the SQS services in the AWS console and let's create 



Configure the Queue and set the Default Visibility Timeout to 15 seconds and enable Content-Based Deduplication



the FIFO was already created

the examples were done using the AWS java code

https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues-getting-started-java.html


SpringBoot controller to send a JsonArray and get items from the FIFO

package com.awssqssender.AwsSenderSQS.Controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.stereotype.Controller;

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;

import java.util.List;
import org.json.JSONArray;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClient;

import com.amazonaws.services.sqs.model.*;
import com.awssqssender.AwsSenderSQS.Controller.Model.TestData;

import java.util.HashMap;
import java.util.Map;

@Controller
@RequestMapping("api")
public class MainController {

    @GetMapping("getfromfifo")
    public ResponseEntity<String> getTotalhoy() {

        return new ResponseEntity<String>(getMessages(), HttpStatus.OK);
    }

    public String getMessages() {

        String returnSTRING = "";
        String accessKey = "AKIAR4GJOBAHA3TEAU";
        String secretKey = "4HFrqCVwKGcJtaq3p/OkVSKehcht36NL6jOnSZ";
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);

        final AmazonSQS sqs = new AmazonSQSClient(credentials);

        System.out.println("===============================================");
        System.out.println("Getting Started with Amazon SQS Standard Queues");
        System.out.println("===============================================\n");

        try {

            final Map<String, String> attributes = new HashMap<>();

            attributes.put("FifoQueue", "true");

            attributes.put("ContentBasedDeduplication", "true");

            // The FIFO queue name must end with the .fifo suffix.
            final CreateQueueRequest createQueueRequest = new CreateQueueRequest("EXAMPLE.fifo")
                    .withAttributes(attributes);
            final String myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();

            System.out.println("Receiving messages from F1.fifo.\n");
            final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);

            final List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
            for (final Message message : messages) {
                System.out.println("Message");
                System.out.println("  MessageId:     " + message.getMessageId());
                System.out.println("  ReceiptHandle: " + message.getReceiptHandle());
                System.out.println("  MD5OfBody:     " + message.getMD5OfBody());
                System.out.println("  Body:          " + message.getBody());
                returnSTRING = message.getBody();
            }
            System.out.println();

            if (messages.size() > 0) {
                System.out.println("Deleting the message.\n");
                final String messageReceiptHandle = messages.get(0).getReceiptHandle();
                sqs.deleteMessage(new DeleteMessageRequest(myQueueUrl, messageReceiptHandle));
            }

        } catch (final AmazonServiceException ase) {
            System.out.println(
                    "Caught an AmazonServiceException, which means " + "your request made it to Amazon SQS, but was "
                            + "rejected with an error response for some reason.");
            System.out.println("Error Message:    " + ase.getMessage());
            System.out.println("HTTP Status Code: " + ase.getStatusCode());
            System.out.println("AWS Error Code:   " + ase.getErrorCode());
            System.out.println("Error Type:       " + ase.getErrorType());
            System.out.println("Request ID:       " + ase.getRequestId());
        } catch (final AmazonClientException ace) {
            System.out.println("Caught an AmazonClientException, which means "
                    + "the client encountered a serious internal problem while "
                    + "trying to communicate with Amazon SQS, such as not " + "being able to access the network.");
            System.out.println("Error Message: " + ace.getMessage());
        }

        return returnSTRING;
    }

    @PostMapping("/sendtofifo")
    public ResponseEntity<String> sendSQS(@RequestBody List<TestData> test) {

        JSONArray jsArray = new JSONArray(test);
        sendingSQS(jsArray.toString());
        return new ResponseEntity<String>("Sending SQS", HttpStatus.OK);
    }

    public void sendingSQS(String message) {

        String accessKey = "AKIAR4GJOBAHA3TEAH";
        String secretKey = "4HFrqCVwKGcJtaq3p/OkVSKehcht36NL6jOnSZ";
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);

        final AmazonSQS sqs = new AmazonSQSClient(credentials);
        try {

            final Map<String, String> attributes = new HashMap<>();

            // A FIFO queue must have the FifoQueue attribute set to true.
            attributes.put("FifoQueue", "true");

            /*
             * If the user doesn't provide a MessageDeduplicationId, generate a
             * MessageDeduplicationId based on the content.
             */
            attributes.put("ContentBasedDeduplication", "true");

            // The FIFO queue name must end with the .fifo suffix.
            final CreateQueueRequest createQueueRequest = new CreateQueueRequest("EXAMPLE.fifo")
                    .withAttributes(attributes);
            final String myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();

            // List all queues.
            /*
             * System.out.println("Listing all queues in your account.\n"); for (final
             * String queueUrl : sqs.listQueues().getQueueUrls()) {
             * System.out.println("  QueueUrl: " + queueUrl); } System.out.println();
             */

            // Send a message.
            System.out.println("Sending a message to F1.fifo.\n");
            final SendMessageRequest sendMessageRequest = new SendMessageRequest(myQueueUrl, message);

            /*
             * When you send messages to a FIFO queue, you must provide a non-empty
             * MessageGroupId.
             */
            sendMessageRequest.setMessageGroupId("messageGroup1");

            // Uncomment the following to provide the MessageDeduplicationId
            // sendMessageRequest.setMessageDeduplicationId("1");
            final SendMessageResult sendMessageResult = sqs.sendMessage(sendMessageRequest);
            final String sequenceNumber = sendMessageResult.getSequenceNumber();
            final String messageId = sendMessageResult.getMessageId();
            System.out.println(
                    "SendMessage succeed with messageId " + messageId + ", sequence number " + sequenceNumber + "\n");

        } catch (final AmazonServiceException ase) {
            System.out.println(
                    "Caught an AmazonServiceException, which means " + "your request made it to Amazon SQS, but was "
                            + "rejected with an error response for some reason.");
            System.out.println("Error Message:    " + ase.getMessage());
            System.out.println("HTTP Status Code: " + ase.getStatusCode());
            System.out.println("AWS Error Code:   " + ase.getErrorCode());
            System.out.println("Error Type:       " + ase.getErrorType());
            System.out.println("Request ID:       " + ase.getRequestId());
        } catch (final AmazonClientException ace) {
            System.out.println("Caught an AmazonClientException, which means "
                    + "the client encountered a serious internal problem while "
                    + "trying to communicate with Amazon SQS, such as not " + "being able to access the network.");
            System.out.println("Error Message: " + ace.getMessage());
        }

    }

}




we are going to test it with Postman, let's first try the send method

[

{"data1":"1",
"data2":"2",
"hasmapList":[{"subdata":"APiDAcgq43NcwEE3CatxMIAUVZJE09VAb0AuyLBKoOk3BnBDxDRKqU4bhiP4WFATnaj86ipfekwA153kNXjLQ1810eVdfg0cWT0azq551L0p3IgLCTdVQ1+KEKaQytnglMNLKbkIRIsg3K5hJEOIfbtdrZ5N/8Ova2I3tP05cc41x9UIYEoT4scSpmSLU82Xk3miNveShQPB5qEMc4K/kvMcTcRYNiEAAEsZBobMp/15QdkoqVF+hjbeysqZEPkGqC8Lw3sYbtEp7YEmDtXRJi69UxN4bbKd3MvssXe207UL9MSYh9I5cRFkAFfOtMNSQxCgTqpYhR2h3wkbBBLXOewgHCgzxudiYyiR19dodjYmwMkEOhdgwTEIoTM+AGWurnL3BcHe/0oOIMokZzZbzgBXXpHeg4uiR21CFz8zU5TtQ8odqHK/VcxVM06F/N9S5eqgbO7usVseymAGDep+Dx/1Ks/cwhQFiw1MS9Ut/zizYpLMsvlVTs/xq5oOMD8RwbDpffHnsG8A+IEByCrjc1zAQTcJq3GwihRVkq223zDCyEegVRYx85SLAlpx2oUD2+Ulc9P7CFJYekRgPx44sdpf2MyCAJdj66M8q+94vTWb+jHYFTQYuAOtfUfcQmCIUyTb5lc/IklfK++fqnVF7UFYQAKfsRDMdS/zDYFqyKp04wIbfIvOxt8JVaq7OhyDJk5EhIOY9GO5ZkFblJY0+dpAZHEhtJ51Np3aGXjZTpXZ3bf0JkDkzvltN1nrUqOiAu0nZ6zaI3fq9adFvzI7guH+PnUneGaZdPeC7s5i1JXAxC5/uYKCQByZamxGOS5+9ihtyoJx9beo6SFeY9aRcYVBQgO8GwChHPCRS6BDSsUvlUkTD5LLeHnH7y9qY6Y2PsBA2HuPkAbMNVBUahuH3motx/nK9cu9IOthYnp6q6xEuC51Fx4EKB9dFtRCvKxz9prODIll4BxP7zlKcsuUxMTK3ipy1KlLcBlWqayxma7/4v/PCzonaBW44nIP0tL5BkJfgMXqnWHcFCoDfG8A+IEByCrjc1zAQTcJq3HwhhRVkpT7iaZNfwtYtjWzqf5CiJ2NZTPf498Ih5qiRDxzIwHC+/OB0AUHD/QSUf9uQElpbGyzJzUnZ8ln/Jlb56xCyKS01z775OS+zFkKOFIz6rymEiWndGkAZgCLDaM16oYexoohEFXGD1wKKSNFmdPMFHhkx32mttgSsz/+T46cifTyqOFeacR09mxmB8mLwBLOU1neV6vsIT36VMxbzfnzfDd8qD/l0k3xvLcnoh5dD/jB3f1SbApvMugVIjYL0vLi9qRUoLnR6NghOCAG7xwyZDsN3AexJEUOSpzkssqwSUhzgfPJxn0PXrwXcHAB87VHYzLvd32NEH5iP8qYx7szh7j7EH6Xn0ylmIV7Rg4DCCnQpEYTWSX4EeFQZDPt3q8abPUidlUIej4APDP7tAp40MI9QYjE2iLrdDwdO+voH6802tZ5pJnTgdVxe17l1AAl/9Pw1EDJg+bg7vRIXVo80rPImfY4m9ME77YsOplRpFJ7qW8A6IMByCrjc1zAQTcJq3EwnhRVkkPgl0pToutBSFCiZRVw80lFQmhj/mMOuVtBirFlhvx/J+vRbBJRuIn0jaIsKhS5b6RTYz+GNawd1RfeNFeTpPj92EEpPfXRozft3wWzykC9z4efDx0aEdhlXCf9aclf3MFnuqlzMVYNToTpFtOz2MkF87t0y7bZmFc9NYhKAC4HvwAu296HofRQRzoB7JK8Eg4So/+iRqY4fs8+I4agl79iUbRHq2SIGs44CGS9Vvs8OD96gJzcnCG1JTDR7vtoe5ZGkJ7o73lTppJwMWP4HvGa6jCN+xW0PFcBt52pz5N1gvdyAj7bqvwgVdfm5EFa5JkcRbjcSVwIqcMu9Gcc3c6/1sCJ/6/oL+4R5uc7WJbGPrCJsKJnqYAxNLdmP8bxQ+IKU/o/z0R0hDHsLGV2hAwsoxh+McG1g69cPsHPUnNRIcyBnZC7Vf60DzQvMjlcKas6odZ4UeMohifYgO/nFDZPRr549vlGCjq158HZvPE27GUNbwD//wMA/P8DAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA0gAAAQAHAAAAgCk="}]}

]



Let's check it, there is a message available on the queue, ready to be gotten from there


Let's see what it has got inside




there was success sending the json string as a message

let's now try the get method

now let's see again the queue fifo, and there is any message any longer so that it was gotten and deleted at the same time



keep in mind that if you send the same message it does not support duplicated messages if for any reason you need to send a repeated message you need to set something which makes it different from the other message despite they are the same inside,  and a way to do it would be sending another attribute like a date





despite it is the same record, we can see the same the record was uploaded twice because the date attribute is the one that makes the difference.

Replace accessKey and secretKey and your .fifo name by yours.


if you want to get IAM credentials, go to services -> IAM -> USERS -> ADD USER give it a name and check Accecss Type: Programatic Access

Click on next and select Attach existing policies directly  and check your policy according to your needs, in this case, I am checking AmazonSQSFullAccess



and after that create the user and get the access key and the secret key



Comments

Popular posts from this blog

How to deploy a VueJS App using Nginx on Ubuntu

There are thousands of blogs and websites out there explaining how to do a hello world and how to start with VueJS, but in this little post, I’m just going to be explaining how to do deploy a VueJs app after you have run the command through the CLI npm run build . So when you run the command npm run build a dist folder is created and that folder’s got the essential .js files to run our app, when we want to run our app on an Nginx server by default the vue-router is not going to work well so that Nginx does not come ready to work by default with a VueJs app This is basically for a Linux Ubuntu distribution if you’ve got any other Linux distribution just pay attention where is going to be set the www/html folder and the Nginx settings so that this is useful for any Linux distribution  Install and configure nginx sudo apt install nginx Double check to make sure the nginx service is running with command service nginx status, then open your browser and enter ...

How to secure SpringBoot with SSL and Tomcat or Undertow

when we are going to take our applications to production mode, we must have an SSL certificate for the FrontEnd and   BackEnd too, in this case, our backend is a  SpringBoot which is using a tomcat embedded server. Terminology TLS vs SSL TLS is the successor to SSL. It is a protocol that ensures privacy between communicating applications. Unless otherwise stated, in this document consider TLS and SSL as interchangable. Certificate (cert) The public half of a public/private key pair with some additional metadata about who issued it etc. It may be freely given to anyone. Private Key A private key can verify that its corresponding certificate/public key was used to encrypt data. It is never given out publicly. Certificate Authority (CA) A company that issues digital certificates. For SSL/TLS certificates, there are a small number of providers (e.g. Symantec/Versign/Thawte, Comodo, GoDaddy, LetsEncrypt) whose certificates are included by most browsers and Op...

How to docker a Laravel 7.4 API with NuxtJS and a AWS RDS using Dockerfile and docker-compose.yml in a Centos EC2 instance

Most of the time people use docker to deploy Laravel, NuxtJS, a desired database engine as Mysql or Postgresql and even Nginx, but this tutorial is quite different it is for those ones who wants to deploy one or more Laravel APIs with NuxtJS using an existing AWS RDS, and a Nginx installed in your host with cerbot as the tool to get the ssl for the domain name Note: you're supposed to have running an AWS RDS with your database already available used tools: On Centos 7            yum update -y && yum install epel-release -y &&          yum install update -y && yum upgrade -y && yum install unzip -y &&          yum install screen -y && yum install nginx -y && yum install telnet -y &&          systemctl enable nginx && systemctl restart nginx && yum install...