Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
281 views
in Technique[技术] by (71.8m points)

amazon web services - AWS FIFO SQS queue message is disappearing when I repost the same message even after successfully deleting it

I am facing a strange issue in SQS. Let me simplify my use-case, I have 7 messages in the FIFO queue and my standalone app should keep-on polling the messages in the same sequence for my business case infinitely. For instance, my app read message1 and after some business processing, the app will delete it and repost the same message into the same queue(tail of the queue), and these steps will be continued for the next set of messages endlessly. Here, my expectation is my app will be polling the message continuously and doing the operations based on the messages in the queue in the same sequence, but that's where the problem arises. When the message is read from the queue for the very first time, delete it, and repost the same message into the same queue, even after the successful sendMessageResult, the reposted message is not present in the queue.

I have included the below code to simulate the issue, briefly, Test_Queue.fifo queue with Test_Queue_DLQ.fifo configured as reDrivePolicy is created. At the very first time after creating the queue, the message is posted -> "Test_Message" into Test_Queue.fifo queue(Getting the MessageId in response ) and long-polling the queue to read the message, and after iterating the ReceiveMessageResult#getMessages, deleting the message(Getting MessageId in response). Again, after the successful deletion of the message, the same message is reposted into the tail of the same queue(Getting the MessageId in response). But, the reposted message is not present in the queue. When, I checked the AWS admin console the message count is 0 in the Messages available and Messages in flight sections and the reposted message is not even present in Test_Queue_DLQ.fifo queue. As per the SQS docs, if we delete the message, even if it is present in flight mode should be removed, so reposting the same message should not be an issue. I suspect on SQS side, where they are performing some equals comparison and skipping the same message during in visibleTimeOut interval to avoid deduplication of the same message in the distributed environment, but couldn't get any clear picture.

Code snippet to simulate the above issue

public class SQSIssue {


    @Test
    void sqsMessageAbsenceIssueTest() {
        AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard().withEndpointConfiguration(new AwsClientBuilder
                .EndpointConfiguration("https://sqs.us-east-2.amazonaws.com", "us-east-2"))
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                        "<accessKey>", "<secretKey>"))).build();

        //create queue
        String queueUrl = createQueues(amazonSQS);
        String message = "Test_Message";
        String groupId = "Group1";

        //Sending message -> "Test_Message"
        sendMessage(amazonSQS, queueUrl, message, groupId);
        //Reading the message and deleting using message.getReceiptHandle()
        readAndDeleteMessage(amazonSQS, queueUrl);
        //Reposting the same message into the queue -> "Test_Message"
        sendMessage(amazonSQS, queueUrl, message, groupId);

        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
                .withQueueUrl(queueUrl)
                .withWaitTimeSeconds(5)
                .withMessageAttributeNames("All")
                .withVisibilityTimeout(30)
                .withMaxNumberOfMessages(10);
        ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage(receiveMessageRequest);
        //Here I am expecting the message presence in the queue as I recently reposted the same message into the same queue after the message deletion
        Assertions.assertFalse(receiveMessageResult.getMessages().isEmpty());


    }

    private void readAndDeleteMessage(AmazonSQS amazonSQS, String queueUrl) {
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
                .withQueueUrl(queueUrl)
                .withWaitTimeSeconds(5)
                .withMessageAttributeNames("All")
                .withVisibilityTimeout(30)
                .withMaxNumberOfMessages(10);
        ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage(receiveMessageRequest);
        receiveMessageResult.getMessages().forEach(message -> amazonSQS.deleteMessage(queueUrl, message.getReceiptHandle()));

    }

    private String createQueues(AmazonSQS amazonSQS) {

        String queueName = "Test_Queue.fifo";
        String deadLetterQueueName = "Test_Queue_DLQ.fifo";

        //Creating DeadLetterQueue
        CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
                .addAttributesEntry("FifoQueue", "true")
                .addAttributesEntry("ContentBasedDeduplication", "true")
                .addAttributesEntry("VisibilityTimeout", "600")
                .addAttributesEntry("MessageRetentionPeriod", "262144");
        createDeadLetterQueueRequest.withQueueName(deadLetterQueueName);
        CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDeadLetterQueueRequest);
        GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
                        .withAttributeNames("QueueArn"));
        String deadLetterQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");

        //Creating Actual Queue with DeadLetterQueue configured
        CreateQueueRequest createQueueRequest = new CreateQueueRequest()
                .addAttributesEntry("FifoQueue", "true")
                .addAttributesEntry("ContentBasedDeduplication", "true")
                .addAttributesEntry("VisibilityTimeout", "600")
                .addAttributesEntry("MessageRetentionPeriod", "262144");

        createQueueRequest.withQueueName(queueName);
        String reDrivePolicy = "{"maxReceiveCount":"5", "deadLetterTargetArn":""
                + deadLetterQueueArn + ""}";
        createQueueRequest.addAttributesEntry("RedrivePolicy", reDrivePolicy);
        CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
        return createQueueResult.getQueueUrl();
    }

    private void sendMessage(AmazonSQS amazonSQS, String queueUrl, String message, String groupId) {
        SendMessageRequest sendMessageRequest = new SendMessageRequest()
                .withQueueUrl(queueUrl)
                .withMessageBody(message)
                .withMessageGroupId(groupId);
        SendMessageResult sendMessageResult = amazonSQS.sendMessage(sendMessageRequest);
        Assertions.assertNotNull(sendMessageResult.getMessageId());
    }
}

question from:https://stackoverflow.com/questions/65841805/aws-fifo-sqs-queue-message-is-disappearing-when-i-repost-the-same-message-even-a

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

From Using the Amazon SQS message deduplication ID:

The message deduplication ID is the token used for deduplication of sent messages. If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren't delivered during the 5-minute deduplication interval.

Therefore, you should supply a different Deduplication ID each time the message is placed back onto the queue.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...