【问题标题】:Python Multiprocessing Reading from SQS从 SQS 读取 Python 多处理
【发布时间】:2017-08-08 11:16:12
【问题描述】:

我有以下代码启动多个 python 进程,这些进程不断从 SQS 队列轮询。

进程通过

启动
num_processes = range(1, 9)

    for p_num in num_processes:
        p = multiprocessing.Process(
            target=sqs_polling, args=(queue_name, p_num,))
        p.start()

而实际的轮询函数是

def sqs_polling(queue_name, process_id):

    sqs = boto3.resource('sqs', region_name='us-east-1')
    queue = sqs.get_queue_by_name(QueueName=queue_name)

    no_messages = False

    # poll sqs forever
    while 1:

        # polling delay so aws does not throttle us
        sleep(2.0)

        # sleep longer if there are no messages on the queue the last time it was polled
        if no_messages:
            sleep(900.0)

        message_batch = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20)

        if len(message_batch) == 0:
            no_messages = True
        else:
            no_messages = False

        # process messages
        for message in message_batch:
            do_something(message)
            message.delete()

这似乎工作了几个小时,但最终似乎 SQS 会限制进程并且即使它们存在于队列中也无法读取任何消息。为了帮助减少这种情况,我在队列读取之间设置了 2 秒的超时时间。如果没有读取消息,我还创建了 15 分钟超时。尽管如此,我仍然会节流。谁能解释为什么节流仍然在这里发生?另一种可能性可能是与队列的连接变得陈旧,但我认为这不太可能。

【问题讨论】:

  • 不需要sleep(2.0)。除非您实际上收到指示节流的错误(您没有提到),否则这不是正在发生的事情——我的 SQS 代码根本没有休眠。队列中的消息,如控制台主页所示......它们是“可用”还是“正在运行”?另外,message.delete() 不返回值吗?队列的 CloudWatch 计数器显示什么?
  • 由于没有错误,我想它没有节流。实际上根本没有错误。几个小时后,这些进程实际上会停止做任何事情(即使它们处于应该不断轮询的 while 循环中)。完全不知道发生了什么。
  • 虽然我使用了许多 AWS 服务,但没有一个是使用 Python 的……但如果一切都失败了,请添加更多日志记录。在循环的每次迭代中记录时间和进程 ID。他们还在投票吗?他们在跑吗?他们挂了吗?终止?
  • 我在 while 循环中有日志。所以它必须挂在那个循环的某个地方。他们还没有投票。很郁闷
  • 我在这里看不到任何会导致发生日志记录的内容。在message_batch = queue.receive_messages... 之前和之后立即添加一些内容...之前的“开始投票”和之后的“完成的投票”,以及时间戳和进程ID。如果它每 20 秒正确记录“开始”和“完成”,那么你有理由怀疑 SQS 有问题……但如果它停止记录,最后记录的任何一条消息都会告诉你在循环中的位置,事情发生了错了。

标签: python amazon-web-services multiprocessing message-queue amazon-sqs


【解决方案1】:

这个问题有点过时了,但我刚刚发布了multi_sqs_listener,它提供了一种高级、多线程的方式来从 Python 代码中侦听多个 SQS 队列。

import time
from multi_sqs_listener import QueueConfig, EventBus, MultiSQSListener


class MyListener(MultiSQSListener):
    def low_priority_job(self, message):
        print('Starting low priority, long job: {}'.format(message))
        time.sleep(5)
        print('Ended low priority job: {}'.format(message))
    def high_priority_job(self, message):
        print('Starting high priority, quick job: {}'.format(message))
        time.sleep(.2)
        print('Ended high priority job: {}'.format(message))
    def handle_message(self, queue, bus, priority, message):
        if bus == 'high-priority-bus':
            self.high_priority_job(message.body)
        else:
            self.low_priority_job(message.body)

low_priority_bus = EventBus('low-priority-bus', priority=1)
high_priority_bus = EventBus('high-priority-bus', priority=5)
EventBus.register_buses([low_priority_bus, high_priority_bus])

low_priority_queue = QueueConfig('low-priority-queue', low_priority_bus)
high_priority_queue = QueueConfig('high-priority-queue', high_priority_bus)
my_listener = MyListener([low_priority_queue, high_priority_queue])
my_listener.listen()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-06-21
    • 1970-01-01
    • 1970-01-01
    • 2015-07-23
    • 2013-05-06
    • 2016-03-24
    • 2020-01-25
    相关资源
    最近更新 更多