【发布时间】:2017-08-08 11:16:12
【问题描述】:
我有以下代码启动多个 python 进程,这些进程不断从 SQS 队列轮询。
进程通过
启动num_processes = range(1, 9)
for p_num in num_processes:
p = multiprocessing.Process(
target=sqs_polling, args=(queue_name, p_num,))
p.start()
而实际的轮询函数是
def sqs_polling(queue_name, process_id):
sqs = boto3.resource('sqs', region_name='us-east-1')
queue = sqs.get_queue_by_name(QueueName=queue_name)
no_messages = False
# poll sqs forever
while 1:
# polling delay so aws does not throttle us
sleep(2.0)
# sleep longer if there are no messages on the queue the last time it was polled
if no_messages:
sleep(900.0)
message_batch = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=20)
if len(message_batch) == 0:
no_messages = True
else:
no_messages = False
# process messages
for message in message_batch:
do_something(message)
message.delete()
这似乎工作了几个小时,但最终似乎 SQS 会限制进程并且即使它们存在于队列中也无法读取任何消息。为了帮助减少这种情况,我在队列读取之间设置了 2 秒的超时时间。如果没有读取消息,我还创建了 15 分钟超时。尽管如此,我仍然会节流。谁能解释为什么节流仍然在这里发生?另一种可能性可能是与队列的连接变得陈旧,但我认为这不太可能。
【问题讨论】:
-
不需要
sleep(2.0)。除非您实际上收到指示节流的错误(您没有提到),否则这不是正在发生的事情——我的 SQS 代码根本没有休眠。队列中的消息,如控制台主页所示......它们是“可用”还是“正在运行”?另外,message.delete()不返回值吗?队列的 CloudWatch 计数器显示什么? -
由于没有错误,我想它没有节流。实际上根本没有错误。几个小时后,这些进程实际上会停止做任何事情(即使它们处于应该不断轮询的 while 循环中)。完全不知道发生了什么。
-
虽然我使用了许多 AWS 服务,但没有一个是使用 Python 的……但如果一切都失败了,请添加更多日志记录。在循环的每次迭代中记录时间和进程 ID。他们还在投票吗?他们在跑吗?他们挂了吗?终止?
-
我在 while 循环中有日志。所以它必须挂在那个循环的某个地方。他们还没有投票。很郁闷
-
我在这里看不到任何会导致发生日志记录的内容。在
message_batch = queue.receive_messages...之前和之后立即添加一些内容...之前的“开始投票”和之后的“完成的投票”,以及时间戳和进程ID。如果它每 20 秒正确记录“开始”和“完成”,那么你有理由怀疑 SQS 有问题……但如果它停止记录,最后记录的任何一条消息都会告诉你在循环中的位置,事情发生了错了。
标签: python amazon-web-services multiprocessing message-queue amazon-sqs