【问题标题】:Get all messages from AWS SQS in NodeJS从 NodeJS 中的 AWS SQS 获取所有消息
【发布时间】:2019-09-19 21:56:35
【问题描述】:

我有以下从 aws SQS 获取消息的函数,问题是我一次获取一条消息,我希望获取所有消息,因为我需要检查每条消息的 ID:

function getSQSMessages() {

    const params = {
        QueueUrl: 'some url',
    };

    sqs.receiveMessage(params, (err, data) => {
        if(err) {
            console.log(err, err.stack)
            return(err);
        }
        return data.Messages;
    });

};

function sendMessagesBack() {

    return new Promise((resolve, reject) => {
        if(Array.isArray(getSQSMessages())) {
            resolve(getSQSMessages());
        } else {
            reject(getSQSMessages());
        };
    });

};

函数 sendMessagesBack() 在另一个 async/await 函数中使用。 我不确定如何获取所有消息,因为我正在研究如何获取它们,人们提到了循环,但我不知道如何在我的情况下实现它。 我假设我必须将 sqs.receiveMessage() 放入一个循环中,但后来我对需要检查什么以及何时停止循环感到困惑,以便我可以获得每条消息的 ID?

如果有人有任何提示,请分享。 谢谢。

【问题讨论】:

    标签: node.js amazon-web-services amazon-sqs


    【解决方案1】:

    我建议你使用 Promise api,它可以让你立即使用 async/await 语法。

    const { Messages } = await sqs.receiveMessage(params).promise();
    // Messages will contain all your needed info
    
    await sqs.sendMessage(params).promise();
    

    这样,你就不需要用 Promises 包装回调 API。

    【讨论】:

      【解决方案2】:

      SQS 在响应中返回的消息不超过 10 条。要获取所有可用消息,您需要递归调用 getSQSMessages 函数。 如果你从 getSQSMessages 返回一个承诺,你可以这样做。

      getSQSMessages()
      .then(data => {
        if(!data.Messages || data.Messages.length === 0){
            // no messages are available. return
        }
        // continue processing for each message or push the messages into array and call 
       //getSQSMessages function again. 
      });
      

      【讨论】:

        【解决方案3】:

        您永远无法保证获取队列中的所有消息,除非在获取其中一些消息后,将它们从队列中删除 - 从而确保下一个请求返回不同的记录选择。

        每个请求将返回“最多”10 条消息,如果您不删除它们,那么很有可能下一个“最多”10 条消息的请求将返回您已经看过的混合消息,并且一些新的 - 所以你永远不会知道你什么时候都看过了。

        也许队列不是在您的用例中使用的正确工具 - 但由于我不了解您的用例,所以很难说。

        【讨论】:

        • 这解释了我的 finctions 的“奇怪”行为,我想知道为什么即使我指定了要返回的最大消息数,我仍然收到不同数量的消息。
        【解决方案4】:

        我知道这有点像死灵,但我昨晚登陆这里,试图从 SQS 的死信队列中提取一些所有消息。虽然接受的答案,“你不能保证从队列中获取所有消息”是绝对正确的.

        依赖关系

        在我的情况下,我的项目中已经有一些依赖项,我过去常常使生活变得更简单。

        • lodash - 这是我们在代码中用来帮助实现功能的东西。我不认为我在下面使用了它,但我将它包括在内,因为它在文件中。
        • cli-progress - 这为您的 CLI 提供了一个不错的小进度条。

        免责声明

        在对与其他系统集成的一些生产错误进行故障排除时,将以下内容汇总在一起。我们的 DLQ 消息包含一些我需要的标识符,以便制定云观察查询以进行故障排除。鉴于这是 AWS 中两个不同的 GUI,来回切换很麻烦,因为我们的 AWS 会话是通过一种联合形式进行的,并且会话最多只能持续一个小时。

        脚本

        #!/usr/bin/env node
        
        const _ = require('lodash');
        const aswSdk = require('aws-sdk');
        const cliProgress = require('cli-progress');
        
        const queueUrl = 'https://[put-your-url-here]';
        const queueRegion = 'us-west-1';
        
        const getMessages = async (sqs) => {
          const resp = await sqs.receiveMessage({
            QueueUrl: queueUrl,
            MaxNumberOfMessages: 10,
          }).promise();
        
          return resp.Messages;
        };
        
        const main = async () => {
          const sqs = new aswSdk.SQS({ region: queueRegion });
        
          // First thing we need to do is get the current number of messages in the DLQ. 
          const attributes = await sqs.getQueueAttributes({
            QueueUrl: queueUrl,
            AttributeNames: ['All'], // Probably could thin this down but its late
          }).promise();
        
          const numberOfMessage = Number(attributes.Attributes.ApproximateNumberOfMessages);
        
          // Next we create a in-memory cache for the messages
          const allMessages = {};
          let running = true;
        
          // Honesty here: The examples we have in existing code use the multi-bar. It was about 10PM and I had 28 DLQ messages I was looking into. I didn't feel it was worth converting the multi-bar to a single-bar. Look into the docs on the github page if this is really a sticking point for you.
          const progress = new cliProgress.MultiBar({
            format: ' {bar} | {name} | {value}/{total}',
            hideCursor: true,
            clearOnComplete: true,
            stopOnComplete: true
          }, cliProgress.Presets.shades_grey);
          const progressBar = progress.create(numberOfMessage, 0, { name: 'Messages' });
        
          // TODO: put in a time limit to avoid an infinite loop. 
          // NOTE: For 28 messages I managed to get them all with this approach in about 15 seconds. When/if I cleanup this script I plan to add the time based short-circuit at that point.
          while (running) {
            // Fetch all the messages we can from the queue. The number of messages is not guaranteed per the AWS documentation. 
            let messages = await getMessages(sqs);
            for (let i = 0; i < messages.length; i++) {
              // Loop though the existing messages and only copy messages we have not already cached.
              let message = messages[i];
              let data = allMessages[message.MessageId];
              if (data === undefined) {
                allMessages[message.MessageId] = message;
              }
            }
        
            // Update our progress bar with the current progress
            const discoveredMessageCount = Object.keys(allMessages).length;
            progressBar.update(discoveredMessageCount);
        
            // Give a quick pause just to make sure we don't get rate limited or something
            await new Promise((resolve) => setTimeout(resolve, 1000));
            running = discoveredMessageCount !== numberOfMessage;
          }
        
          // Now that we have all the messages I printed them to console so I could copy/paste the output into LibreCalc (excel-like tool). I split on the semicolon for rows out of habit since sometimes similar scripts deal with data that has commas in it.
          const keys = Object.keys(allMessages);
          console.log('Message ID;ID');
          for (let i = 0; i < keys.length; i++) {
            const message = allMessages[keys[i]];
            const decodedBody = JSON.parse(message.Body);
            console.log(`${message.MessageId};${decodedBody.id}`);
          }
        };
        
        main();
        

        【讨论】:

          猜你喜欢
          • 2021-03-30
          • 2021-10-11
          • 1970-01-01
          • 2020-08-22
          • 1970-01-01
          • 2017-05-29
          • 2021-01-15
          • 2017-06-09
          • 1970-01-01
          相关资源
          最近更新 更多