【问题标题】:Delete messages in a queue on a RabbitMQ (AMQP) server删除 RabbitMQ (AMQP) 服务器上队列中的消息
【发布时间】:2020-01-10 03:25:58
【问题描述】:

我有 1 个大任务,其中包含 200 个子任务(消息),它们将被发布到一个队列中。如果我想取消这 1 个任务,则应删除 200 条消息(或尚未处理的未处理消息)。有什么方法可以删除队列中这些已发布的消息?

我能想到的一个解决方案是创建一个队列 (Q),我在其中发布一个新队列 (X) 的名称。然后每个消费者连接到这个新的动态创建的队列 (X) 并处理 200 条已发布的消息。如果我想中止整个任务,我只从发布者端删除该队列 (X)。这是一种常见的方法吗?

【问题讨论】:

  • 听起来不错。
  • 数据库事务的类似方法怎么样。从初始启动消息开始,将其推送到队列中,然后插入剩余的消息。如果处理顺利,则插入最终提交消息,否则插入回滚或丢弃消息。确保开始消息、最终消息和丢弃/回滚消息在另一消费者端被唯一标识和处理。
  • 虽然对于这种需求。 Kafka 将是我的首选,因为它提供了消息队列中的事务功能。在这里,您可以获得将子任务消息发送到 Kafka 队列的功能。在您提交正在进行的事务之前,其他队列消费者不会读取主题。即使您不必担心删除中止事务中的消息。见Kafka Transactional Messaging

标签: python python-3.x rabbitmq amqp


【解决方案1】:

我发现您建议的方法没有什么问题。

第一个问题是由于 RMQ consumer prefetch,它旨在通过减少对代理的请求量来提高性能。如果您的消费者已经检索了一批任务,他们将在请求新任务之前将它们全部处理,然后他们才会意识到队列已被取消。因此,您的取消请求在大多数情况下都不会得到妥善处理。您可以将预取计数减少到 1 以避免这种副作用,但这会增加网络压力并降低整体速度。

第二个问题是 AMQP 协议没有提供优雅处理队列删除的机制。因此,您的消费者需要小心处理队列消失的问题,否则它们会崩溃。通过这样做,您将失去对错误和问题的可见性。如何区分队列何时被显式删除与实际崩溃的情况?

在这种情况下,我建议您使用其父作业的标识符标记您的所有任务。每次消费者开始消费一个新任务时,它都会检查父作业是否有效或已被取消。在后一种情况下,它会简单地忽略任务并移动到下一个任务。您需要为此提供支持服务。例如,一个 Redis 实例应该足够了。

这种机制将更加简单和强大。您可以根据需要旋转任意数量的消费者,而无需编排他们与正确队列的连接。乱序或交错的任务也不会成为问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-03-26
    • 1970-01-01
    • 2015-12-20
    • 1970-01-01
    • 2012-06-05
    • 1970-01-01
    • 1970-01-01
    • 2011-07-17
    相关资源
    最近更新 更多