【发布时间】:2020-08-07 14:33:19
【问题描述】:
我正在调查 AWS SQS FIFO 队列并构建一些原型。但是,我很难掌握如何提取已发送的特定消息。 现在有几个问题:
- 我了解到,通过
ReceiveMessageAsync调用,将返回消息列表。我是否应该遍历此列表并将MessageId属性与我发送的原始消息的属性相匹配? - 如果队列中有未处理的消息列表,假设是 15 条消息,并且我发送了一条新消息,我的消息是否仅在从队列中删除至少 6 条消息时才返回
ReceiveMessageAsync?
目前在我的原型中,我执行SendMessageAsync 请求,然后立即执行ReceiveMessageAsync 以获取处理后的消息。正是在这里,我循环接收到的 messageIds 列表以获取我的消息,对消息执行一些逻辑,然后请求从队列中删除消息。这个逻辑对吗?
var sqsClient = new AmazonSQSClient(RegionEndpoint.EUWest1);
var sendQueueUrl = $"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSSend"]}";
var deduplicationId = "fc1e026d-4a04-4cdf-b0b0-16bc78dde19c"; //Guid.NewGuid().ToString();
var sqsMessageRequest = new SendMessageRequest
{
QueueUrl = sendQueueUrl,
MessageGroupId = "testGroup",
MessageDeduplicationId = deduplicationId,
MessageBody = "{\"message\":\"hello\"}"
};
try
{
var sendMessageResponse = await sqsClient.SendMessageAsync(sqsMessageRequest);
var receiveQueueUrl = $"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSReceive"]}";
var receiveMessageRequest = new ReceiveMessageRequest
{
AttributeNames = { "All" },
MaxNumberOfMessages = 10,
MessageAttributeNames = { "All" },
QueueUrl = receiveQueueUrl,
WaitTimeSeconds = 20
};
bool messagesFound = false;
while (!messagesFound)
{
var receiveMessageResponse = await sqsClient.ReceiveMessageAsync(receiveMessageRequest);
if (receiveMessageResponse.HttpStatusCode != System.Net.HttpStatusCode.OK)
Console.WriteLine("Failed request to receive message\n");
else
{
foreach (var message in receiveMessageResponse.Messages)
{
if (message.MessageId != sendMessageResponse.MessageId)
continue;
messagesFound = true;
/*process message further and delete afterwards*/
var deleteMessageRequest = new DeleteMessageRequest($"{ConfigurationManager.AppSettings["AWSServer"]}{ConfigurationManager.AppSettings["SQSReceive"]}", message.ReceiptHandle);
var deleteMessageResponse = await sqsClient.DeleteMessageAsync(deleteMessageRequest);
}
}
}
}
catch (Exception ex)
{
throw new Exception("SendMessageAsync: " + ex.Message);
}
finally
{
sqsClient.Dispose();
}
【问题讨论】:
-
为什么您要“提取已发送的特定消息”?这不符合使用队列的正常方式。您能告诉我们更多关于您的整体流程以及为什么可能需要特定信息的信息吗?
-
因此,对于我的原型,我希望在发送后立即从队列中接收更新的消息。有一个消费者在他们这边处理和更新消息,然后用处理后的消息更新队列,之后我想显示消息。
-
您能否提供有关您系统总体情况的信息? “他们身边”是什么意思?是否会有大量消息发送到队列进行处理?如果您想要处理特定消息,而不是仅仅将下一条消息从队列中取出,为什么要使用队列?如果您可以详细说明您的最终目标(而不是实施细节),我们或许可以提供替代解决方案。
-
系统上的用户执行了一些详细信息的更新,这会向队列发送“更新”消息(用户正在等待响应以继续)。该消息由另一个系统接收,该系统需要执行数据更新并将更新的消息提供回生产者。该更新消息可以包含任何类型的信息(失败/成功更新指示、新信息等)。生产者系统接收到特定的“更新”消息,用户可以继续使用系统。
标签: amazon-web-services amazon-sqs