【问题标题】:How to read RabbitMQ unacknowledged messages / RabbitMQ cycle如何读取 RabbitMQ 未确认消息 / RabbitMQ 循环
【发布时间】:2024-10-16 11:15:02
【问题描述】:

我想读取 RabbitMQ 队列中未确认消息的有效负载或 messageId。这可能吗?

我想这样做的原因是我试图使用 RabbitMQ 死信功能来建立一个循环来定期自动生成消息。 简单来说,创建两个队列——工作队列和延迟队列。

  1. 将延迟队列中消息的TTL设置为需要周期性的时间频率。对于不同的工作目的,可以有不同的 TTL 不同的消息;
  2. 将消息放入延迟队列。当消息过期时,它会重新发布到工作队列中。消息可以根据需要一直留在工作队列中,直到消费者准备好使用它。
  3. 一个消费者拿起消息并处理它。如果处理成功,消费者需要确认工作队列,然后将消息写回延迟队列;如果处理失败(例如,线程崩溃),则没有确认。然后消息将自动重新出现在工作队列中。然后另一个消费者可以接手这项工作。当发送回延迟队列的消息再次过期时,它会被重新发布,然后被消费者重新消费……构建一个循环,分配工作负载。

我想确保循环中没有丢失或重复的消息,因为我不希望丢失工作或同时做双重工作。但是,发生重复消息的可能性很小。下面展示了消费者首先将消息写回延迟队列,并确认工作队列。如果线程在两行以下之间崩溃,则消息将在延迟队列中,并且 Rabbit 再次将消息重新发布到工作队列中。最终在循环中出现重复的消息。

  channel.basicPublish(DELAY_EXCHANGE, "", null, message.getBytes());
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

为了防止出现上述情况,我想在上面两行之后添加一个看狗逻辑:

  1. 检查循环中的消息总数(两个队列中的总消息数),看是否等于我的预期数量(我预期的数字减去 10);

  2. 如果数字不匹配,我想弄清楚哪个是丢失的,哪个是重复的,然后处理它。我不关心这些消息的顺序,或者频率受到干扰,因为这是一个非常需要考虑的边缘情况。我可以轻松地检索那些准备好的消息并将它们重新排队。但问题是如何处理那些未确认的消息?

非常感谢您!

罗伊

【问题讨论】:

    标签: rabbitmq rabbitmqctl


    【解决方案1】:

    无法从其他上下文读取未确认的消息,原始消息已被消费并保留为未确认消息。

    【讨论】:

    • 非常感谢您的回答。我在我原来的问题上提供了更多细节。你能看看吗?再次感谢!
    • 当问题看起来像一个全新的问题时,改变这么多并不是一个好主意。
    • 关于每条消息的 TTL,请注意,无论每条消息的 TTL 是什么,它们都只会在到达队列头时从队列中移出(如果设置了 DLX,则将被删除)。
    • 另外,消息计数的想法可能行不通,因为很难预测多个消费者和发布者的行为。只需构建重复容忍流。为了进一步讨论,我建议在RabbitMQ official user group 上提出这个问题(先搜索类似问题)。
    • @zaq178miami 我正在尝试在我的本地主机上使用 RabbitMQ,但是很难使用它,你能给我任何关于这个的指示