【问题标题】:RabbitMQ stops delivering MQTT messages to consumer after consumer restarts消费者重启后,RabbitMQ 停止向消费者传递 MQTT 消息
【发布时间】:2023-11-22 05:25:01
【问题描述】:

我在 OpenShift 上运行了一个启用了 MQTT 插件的 RabbitMQ V3.6.10 实例。我有多个 Spring Boot 应用程序,Eclipse PAHO MQTT implementation 使用来自 RabbitMQ 队列的消息。所有消费者都使用MqttDefaultFilePersistence。持久性数据被写入具有 100M 配额的持久性卷上的目录/tmp/mqtt。这是我为连接到 RabbitMQ 队列而编写的代码:

    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(username);
    options.setPassword(password.toCharArray());
    options.setCleanSession(false);
    options.setAutomaticReconnect(reconnect);
    MqttClient mqttClient = new MqttClient(serverUri, "MyConsumerApp", new MqttDefaultFilePersistence("/tmp/mqtt"));
    mqttClient.connect(options);

现在我认识到,由于某些原因,如果重新启动 cosumer 应用程序,它不会再使用任何消息。正如我在 RabbitMQ 管理控制台中看到的那样,队列中的消息正在填满。 RabbitMQ 日志中没有错误消息。在这种情况下,唯一可行的解​​决方案是在 RabbitMQ 管理控制台中删除订阅队列并再次重新启动消费者应用程序。也许有人遇到过类似的问题?有没有人暗示我这里发生了什么?也许 RabbitMQ 使用的 MQTT 插件和保存消息状态的 PAHO 实现存在问题。看起来 RabbitMQ 不知道在消费者断开连接后从哪里恢复,所以它只是停止传递消息。有点悲观的锁定... :-)

【问题讨论】:

    标签: java rabbitmq openshift mqtt


    【解决方案1】:

    您必须使用持久队列。如果填满队列是问题,那么您可以使用自动删除队列。但这带来了一个缺点,即尝试传递消息,如果没有消费者,则删除队列并丢失消息。

    对于您的情况,您应该查看死信交换 (https://www.rabbitmq.com/dlx.html)

    如果消费者被切断并在一段时间后重新连接,则可以从死信交换中提供未处理的消息。这样消息的顺序也不会丢失。

    【讨论】:

    • 持久队列?我必须手动创建队列吗?实际上队列是在消费者应用程序启动时创建的。队列的名称类似于“mqtt-subscription-xyz”,并自动绑定到交换“mqtt”。