【问题标题】:How do I get All the mesages from the SQS queue如何从 SQS 队列中获取所有消息
【发布时间】:2019-02-13 13:24:29
【问题描述】:

我正在使用 SQS 读取数据。但我不确定如何从队列中读取所有数据。

public List<Customer> getMessage() {


 int numberOfMessages= getMessageCount();
 System.out.println(numberOfMessages);
 int count=0;
 while(count<10) {

     System.out.println("Messages remaining in the queue- 
 >>>"+numberOfMessages);
     System.out.println("Recieving Messages from the Queue: ");
        final ReceiveMessageRequest receiveMessageRequest =
        new ReceiveMessageRequest(queueURL)
        .withMaxNumberOfMessages(10)
        .withWaitTimeSeconds(20);

        final List<com.amazonaws.services.sqs.model.Message> customers =   
                  sqs.receiveMessage(receiveMessageRequest).getMessages();


        for(com.amazonaws.services.sqs.model.Message cust: customers) {
            System.out.println("Current message number->>>>>"+(count+1));
            System.out.println(cust.getBody());
            sqs.deleteMessage(new DeleteMessageRequest(queueURL,
                    cust.getReceiptHandle()));
            count++;
        }

        //numberOfMessages=getMessageCount();

 }
 return null;

 }

public int getMessageCount() {
    Set<String> attrs = new HashSet<String>();
     attrs.add("ApproximateNumberOfMessages");
     CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName("sampleQueueSharma");
     GetQueueAttributesRequest a = new GetQueueAttributesRequest().withQueueUrl(sqs.createQueue(createQueueRequest).getQueueUrl()).withAttributeNames(attrs);
     Map<String,String> result = sqs.getQueueAttributes(a).getAttributes();
     int num = Integer.parseInt(result.get("ApproximateNumberOfMessages"));
     return num;
    }

我正在以这种方式读取数据,但这似乎不对。

我还尝试将 while(count&lt;10) 替换为 while(numberOfMessages&gt;0) 并取消注释 numberOfMessages=getMessageCount() 这一行,但这样做,代码会无限期地运行。似乎它总是返回一个 值大于 1。

有人可以帮我解决这个问题吗?

【问题讨论】:

  • 您的程序似乎没有对消息的内容做任何事情——它只是在删除内容。这是故意的吗?您实际上想用代码完成什么?如果您只想清空现有队列,最简单的方法是使用PurgeQueue() 或通过控制台清空队列。
  • 嘿@JohnRotenstein。实际上,我想从中创建一个 Lambda 函数,该函数将从 sqs 获取数据并将其发送到休息服务。暂时我只是想从sqs中获取数据并打印出来。

标签: java amazon-web-services spring-boot amazon-sqs aws-java-sdk


【解决方案1】:

首先要注意几点:

  1. 使用count 时,您只能阅读大约 10 条消息(由于批处理,可能会稍微多一些)。您可能不想在简单的概念验证阶段之外使用它
  2. 使用while (numberOfMessages &gt; 0),只要SQS 的消息计数近似值表明它有消息,您就可以继续阅读。请注意,这是一个近似值,因此您不应依赖它是精确的(它最终会保持一致)。
  3. 您的getMessageCount() 方法看起来像是在每次调用它时都在尝试重新创建队列——虽然这会起作用,但您不需要这样做。创建一次即可使用。

根据我看到的代码,getMessageCount() 将返回 &gt; 1,如果 (a) 您只是有大量消息,(b) 其他东西不断向队列中添加消息(或 (c) 如果您'没有正确删除它们,但你正在这样做)。

我建议对您的代码进行以下修改:

  1. 每次调用getMessageCount() 时记录其结果。如果您将消息放入队列的速度快于处理它们的速度,或者有一些永远不会用完的消息源,这将告诉您。
  2. 记录您的ReceiveMessageRequest 收到的消息数量。这会让您知道您确实在处理消息。
  3. 不要让您的控制流基于getMessageCount() 的值,而是继续调用直到ReceiveMessageRequest 结果(带有waitTimeSeconds=20)返回0 条消息 - 这是您的队列在那个时刻为空的保证(而不是近似值)。

【讨论】:

    猜你喜欢
    • 2021-03-30
    • 1970-01-01
    • 2019-11-07
    • 2013-12-13
    • 2014-10-10
    • 2012-04-28
    • 2021-03-16
    • 2019-09-19
    • 1970-01-01
    相关资源
    最近更新 更多