【问题标题】:Am I sending and receiving messages using AWS SQS FIFO queue correctly?我是否正确使用 AWS SQS FIFO 队列发送和接收消息?
【发布时间】:2020-08-07 14:33:19
【问题描述】:

我正在调查 AWS SQS FIFO 队列并构建一些原型。但是,我很难掌握如何提取已发送的特定消息。 现在有几个问题:

  1. 我了解到,通过ReceiveMessageAsync 调用,将返回消息列表。我是否应该遍历此列表并将MessageId 属性与我发送的原始消息的属性相匹配?
  2. 如果队列中有未处理的消息列表,假设是 15 条消息,并且我发送了一条新消息,我的消息是否仅在从队列中删除至少 6 条消息时才返回 ReceiveMessageAsync

目前在我的原型中,我执行SendMessageAsync 请求,然后立即执行ReceiveMessageAsync 以获取处理后的消息。正是在这里,我循环接收到的 messageIds 列表以获取我的消息,对消息执行一些逻辑,然后请求从队列中删除消息。这个逻辑对吗?

var sqsClient = new AmazonSQSClient(RegionEndpoint.EUWest1);
            var sendQueueUrl = $"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSSend"]}";
            var deduplicationId = "fc1e026d-4a04-4cdf-b0b0-16bc78dde19c"; //Guid.NewGuid().ToString();

        var sqsMessageRequest = new SendMessageRequest
        {
            QueueUrl = sendQueueUrl,
            MessageGroupId = "testGroup",
            MessageDeduplicationId = deduplicationId,
            MessageBody = "{\"message\":\"hello\"}"
        };
        try
        {
            var sendMessageResponse = await sqsClient.SendMessageAsync(sqsMessageRequest);

            var receiveQueueUrl = $"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSReceive"]}";
            var receiveMessageRequest = new ReceiveMessageRequest
            {
                AttributeNames = { "All" },
                MaxNumberOfMessages = 10,
                MessageAttributeNames = { "All" },
                QueueUrl = receiveQueueUrl,
                WaitTimeSeconds = 20
            };

            bool messagesFound = false;

            while (!messagesFound)
            {
                var receiveMessageResponse = await sqsClient.ReceiveMessageAsync(receiveMessageRequest);

                if (receiveMessageResponse.HttpStatusCode != System.Net.HttpStatusCode.OK)
                    Console.WriteLine("Failed request to receive message\n");
                else
                {
                    foreach (var message in receiveMessageResponse.Messages)
                    {
                        if (message.MessageId != sendMessageResponse.MessageId)
                            continue;

                        messagesFound = true;
                        /*process message further and delete afterwards*/
                        var deleteMessageRequest = new DeleteMessageRequest($"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSReceive"]}", message.ReceiptHandle);
                        var deleteMessageResponse = await sqsClient.DeleteMessageAsync(deleteMessageRequest);
                    }
                }
            }
        }
        catch (Exception ex)
        {
            throw new Exception("SendMessageAsync: " + ex.Message);
        }
        finally
        {
            sqsClient.Dispose();
        }

【问题讨论】:

  • 为什么您要“提取已发送的特定消息”?这不符合使用队列的正常方式。您能告诉我们更多关于您的整体流程以及为什么可能需要特定信息的信息吗?
  • 因此,对于我的原型,我希望在发送后立即从队列中接收更新的消息。有一个消费者在他们这边处理和更新消息,然后用处理后的消息更新队列,之后我想显示消息。
  • 您能否提供有关您系统总体情况的信息? “他们身边”是什么意思?是否会有大量消息发送到队列进行处理?如果您想要处理特定消息,而不是仅仅将下一条消息从队列中取出,为什么要使用队列?如果您可以详细说明您的最终目标(而不是实施细节),我们或许可以提供替代解决方案。
  • 系统上的用户执行了一些详细信息的更新,这会向队列发送“更新”消息(用户正在等待响应以继续)。该消息由另一个系统接收,该系统需要执行数据更新并将更新的消息提供回生产者。该更新消息可以包含任何类型的信息(失败/成功更新指示、新信息等)。生产者系统接收到特定的“更新”消息,用户可以继续使用系统。

标签: amazon-web-services amazon-sqs


【解决方案1】:

如果这是您的目标,则循环遍历队列以查找特定项目对于 FIFO 队列中的 SQS 效率低下。

您实际上会更好地查看Virtual Queues,它将提供一种使用 SQS 执行发送和接收的一对一映射的方法。

通过使用此功能,您可以为您的特定消息定义一个虚拟队列,消费者将能够处理所有消息(或技术上来自特定虚拟队列的消费者)。

虽然 FIFO 和标准 SQS 队列都需要处理队列中的所有消息,但一次最多只能处理 10 条消息。这使得尝试查找您的特定消息变得非常低效。

【讨论】:

  • 当然,我明白了。您确实是正确的,应该处理消息,每个消息都有自己的。
  • 没问题,希望对您有所帮助:)
猜你喜欢
  • 2018-08-29
  • 1970-01-01
  • 2018-09-26
  • 2018-12-31
  • 1970-01-01
  • 1970-01-01
  • 2017-12-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多