【问题标题】:Processing SQS queues with boto使用 boto 处理 SQS 队列
【发布时间】:2012-06-23 07:23:41
【问题描述】:

我有一个使用 ec2 实例上的 boto 库的 python 脚本,它是自动缩放组的一部分。该脚本处理来自 SQS 队列的消息:

import boto
from boto.sqs.message import Message

conn = boto.connect_sqs()
q = conn.create_queue('queue-name')

while (qin.count() > 0):
    m = q.get_messages()
    #do something with the message

使用 while 语句有意义吗? count() 是否实时更新为:

  1. 其他实例将消息从队列中取出(或者我要加倍)
  2. 新消息被添加到队列中(或者我会错过它们吗?)

如何让这个脚本不断地监听队列中的新增内容,即使队列是空的?

在这个问题Processing items in SQS queue with a php script 中提到'sqs ruby​​ 客户端库有一个方法“poll”,它不断地轮询队列,并在接收到队列中的消息时将其传递给一个块'。 Python中是否有等价物?

也有人建议可以使用 SNS 来通知脚本消息队列状态,但我看不出如何使用 SNS 配置响应式系统,因为指标警报的粒度不够细。

【问题讨论】:

标签: python amazon-web-services boto amazon-sqs


【解决方案1】:

例子:

# wait_time_seconds count only 1 request in x seconds (0 - 20)
# num_messages get x messages in same request (1 - 10)
while 1:
    logger.info("... waiting messages ...")
    messages = queue_in.get_messages(wait_time_seconds=20, num_messages=10)
    for message in messages:
        logger.info('message: %s' % (message,))
        queue_in.delete_message(message)

【讨论】:

    【解决方案2】:
    1. 当您从 SQS 中提取消息时,该消息将变得不可见,并且其他队列查询无法访问(编辑 - 不可见性可以设置在 0 到 12 小时之间)。
    2. 每次添加新消息时,您都必须重新获取队列,但这应该不是问题 - 这就是队列服务首先存在的原因。

    如果您想不断轮询队列,请尝试所谓的Long Polling - 您可以进行长达 20 秒的连续轮询,当队列被填充时返回。

    希望对您有所帮助,否则请戳 boto sqs documentation

    【讨论】:

    • 消息不可见性默认为 30 秒,而不是 4 天。而且这个时间段是可以修改的。
    【解决方案3】:

    您不应依赖队列的计数,因为它仅用于提供近似计数,不能保证准确。

    如果您想永远保持轮询,请执行以下操作:

    while 1:
        messages = q.get_messages()
        # do something with messages
        time.sleep(N)
    

    我添加了对 time.sleep 的调用以在循环中引入延迟。 N 的值应该至少为一秒,并且可能更多,这取决于您期望新消息出现在队列中的速度。如果您不在循环中设置某种延迟,您可能会开始受到服务的限制。

    为避免消息被多次读取,您应该尝试将队列的可见性超时调整为大于处理消息所需时间的值,然后确保在处理完成后删除消息.

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-07-16
      • 2018-03-05
      • 2016-10-07
      • 2018-12-31
      • 1970-01-01
      • 1970-01-01
      • 2018-09-10
      • 1970-01-01
      相关资源
      最近更新 更多