【问题标题】:Boto3: is there a way to gracefully interrupt an SQS long poll receive messages request?Boto3:有没有办法优雅地中断 SQS 长轮询接收消息请求?
【发布时间】:2026-01-22 23:25:01
【问题描述】:

我已经成功创建了一个使用长轮询从 SQS 队列中提取消息的方法,如下所示:

def dequeue_message(self, callback):
    result = self.queue.receive_messages(MaxNumberOfMessages=1)
    if len(result) != 0:
        body = result[0].body
        try:
            callback(body)
            result.delete()
        except Exception as e:
            print("message not dequeued because an error occurred"
                  "when running callback: " + str(e))

但我无法找到一种方法来停止轮询而不终止正在运行的 Python 进程(或者,显然,等待超时)。我能做什么?

【问题讨论】:

  • 如果在长轮询期间没有消息到达队列,它应该在 20 秒或更短的时间内返回一个空结果。这还不够吗?
  • 嗯,这取决于。具体是什么问题?预计用户中断 SQS 接收消息请求的频率是多少?也许对于许多人来说,等待 20 秒就足够了,但对于其他人来说,所花费的时间可能是一个问题,或者只是令人讨厌。我的问题集中在一个具体问题上,而不是这个问题在我的具体情况或大多数实际情况中是否真的很重要。

标签: python amazon-web-services boto amazon-sqs


【解决方案1】:

您可以简单地使用标志来执行此操作。假设一个按钮应该中断轮询过程。按下按钮后立即更新标志。然后,当轮询返回一组消息时,您检查标志,并忽略处理消息。不用担心,消息仍会在队列中。见:link

Amazon SQS 在收到消息后不会自动删除它 为您,以防您未成功收到消息(对于 例如,消费者可能会失败或失去连接)。删除一个 消息,您必须发送一个单独的请求,确认您 不再需要该消息,因为您已成功收到并且 处理它。

示例代码

# This is the flag to check for interruptions
is_interrupted = False

# This function will set the flag if an interruption is occurred
def interrupt_polling():
    is_interrupted = True


def dequeue_message(self, callback):
    result = self.queue.receive_messages(MaxNumberOfMessages=1)

    # this is the check that will bypass the Polling process
    # Handle this logic as required
    if is_interrupted:
        # assuming that this needs to be reset to False
        is_interrupted = False
        return

    if len(result) != 0:
        body = result[0].body
        try:
            callback(body)
            result.delete()
        except Exception as e:
            print("message not dequeued because an error occurred"
                  "when running callback: " + str(e))

希望这会有所帮助。

【讨论】:

  • 这并没有解决问题。是的,一旦设置了is_interrupted,您的循环最终会退出,但问题是如何立即从对receive_messages 的阻塞调用中中断,而不是等待它在超时后返回。
最近更新 更多