【问题标题】:Amazon SQS with aws-sdk receiveMessage Stall带有 aws-sdk receiveMessage Stall 的 Amazon SQS
【发布时间】:2016-09-03 19:58:26
【问题描述】:

我正在使用 aws-sdk 节点模块和(据我所知)批准的方式来轮询消息。

基本上总结为:

        sqs.receiveMessage({
            QueueUrl: queueUrl,
            MaxNumberOfMessages: 10,
            WaitTimeSeconds: 20
        }, function(err, data) {
            if (err) {
                logger.fatal('Error on Message Recieve');
                logger.fatal(err);
            } else {
                // all good
                if (undefined === data.Messages) {
                    logger.info('No Messages Object');
                } else if (data.Messages.length > 0) {
                    logger.info('Messages Count: ' + data.Messages.length);

                    var delete_batch = new Array();
                    for (var x=0;x<data.Messages.length;x++) {
                        // process
                        receiveMessage(data.Messages[x]);

                        // flag to delete

                        var pck = new Array();
                        pck['Id'] = data.Messages[x].MessageId;
                        pck['ReceiptHandle'] = data.Messages[x].ReceiptHandle;

                        delete_batch.push(pck);
                    }

                    if (delete_batch.length > 0) {
                        logger.info('Calling Delete');
                        sqs.deleteMessageBatch({
                            Entries: delete_batch,
                            QueueUrl: queueUrl
                        }, function(err, data) {
                            if (err) {
                                logger.fatal('Failed to delete messages');
                                logger.fatal(err);
                            } else {
                                logger.debug('Deleted recieved ok');
                            }
                        });
                    }
                } else {
                    logger.info('No Messages Count');
                }
            }
        });

receiveMessage 是我的“如果我收集到足够多的消息,就处理收集到的消息”功能

有时,我的脚本会停止,因为我根本没有收到亚马逊的响应,例如队列中没有要消费的消息,而不是点击 WaitTimeSeconds 并发送“无消息对象”,未调用回调。

(我正在向 Amazon Weirdness 写这篇文章)

我要问的是检测和处理这个问题的最佳方法是什么,因为我有一些代码可以阻止对 receiveMessage 的并发调用。

这里的建议答案:Nodejs sqs queue processor 也有代码可以防止并发消息请求查询(假设它一次只获取一条消息)

我确实把整个东西都包在里面了

var running = false;
runMonitorJob = setInterval(function() {
    if (running) {
    } else {
        running = true;
        // call SQS.receive
    }
}, 500);

(在删除循环之后运行 = false(不在它的回调中))

我的解决方案是

watchdogTimeout = setTimeout(function() {
    running = false;
}, 30000);

但这肯定会留下一堆浮动的sqs.receive,随着时间的推移会留下很多内存吗?

(这个作业一直在运行,我在星期五让它运行,它在星期六早上停止并挂起,直到我今天早上手动重新启动作业)

编辑:我见过它挂起约 5 分钟然后突然收到消息 BUT 等待时间为 20 秒的情况,它应该在 20 秒后抛出“无消息”。所以约 10 分钟的 WatchDog 可能更实用(取决于其余的业务逻辑)

编辑:是长轮询已配置队列端。

编辑:这是aws-sdk 的(最新)v2.3.9 和 NodeJS v4.4.4

【问题讨论】:

  • 运行 crontab 是否有任何限制,即在运行接收消息之前运行另一个进程来“get-queue-attributes”?
  • 我想 SQS 队列中的消息数会被备份。 “get-queue-attributes”会做什么?
  • 如果您习惯了 AWS 命名,您可以交叉引用该特定语言 API 的命名约定。 docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/… 从它返回的元数据之一是“ApproximateNumberOfMessages”。
  • 我要问的是检测和处理这个问题的最佳方法是什么 ...似乎您想要进行一些数据包嗅探并查明真相它,而不是容纳它。这是不可接受或预期的行为。 http 请求无限期挂起甚至不应该是可能。应该有 TCP 超时,或发生某事
  • 谢谢两位,会进一步调查

标签: node.js amazon-web-services amazon-sqs


【解决方案1】:

我也遇到了这个问题,但不是在调用receiveMessage而是sendMessage时。我还看到正好 120 秒的挂断。我还看到了其他一些服务,比如 Firehose。

这导致我在 AWS 开发工具包中找到这一行:

SQS Constructor

httpOptions:

  • timeout [Integer] — 将套接字设置为在套接字不活动超时毫秒后超时。默认为两分钟 (120000)。

为了实施修复,我覆盖了我的 SQS 客户端的超时,该客户端执行 sendMessage 以在 10 秒后超时,另一个 25 秒用于接收(我长轮询 20 秒):

var sendClient    = new AWS.SQS({httpOptions:{timeout:10*1000}});
var receiveClient = new AWS.SQS({httpOptions:{timeout:25*1000}});

我已经将它投入生产一周了,我注意到我的所有 SQS 停滞问题都已消除。

【讨论】:

    【解决方案2】:

    几天来我一直在关注这个(或类似的)问题,这就是我注意到的:

    • receiveMessage 调用最终会返回,尽管仅在 120 秒后才返回

    • 对 receiveMessage 的并发调用由 AWS.SDK 库序列化,因此并行进行多个调用无效。

    • receiveMessage 回调没有出错 - 实际上在 120 秒过去后,它可能包含消息。

    对此可以做些什么?发生这种事情的原因有很多,其中一些/许多事情不一定能解决。答案是运行多个服务,每个服务都调用 receiveMessage 并在消息到来时对其进行处理——SQS 支持这一点。在任何时候,其中一项服务可能会达到 120 秒的延迟,但其他服务应该能够正常继续。

    我的特殊问题是我有一些关键的单例服务无法承受 120 秒的停机时间。为此,我将研究 1) 使用 HTTP 而不是 SQS 将消息推送到我的服务中,或者 2) 在每个单例周围生成从属进程以从 SQS 获取消息并将它们推送到服务中。

    【讨论】:

      猜你喜欢
      • 2022-07-22
      • 2014-09-04
      • 2011-12-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-25
      • 2017-12-16
      • 1970-01-01
      相关资源
      最近更新 更多