【发布时间】:2023-11-22 05:25:01
【问题描述】:
我在 OpenShift 上运行了一个启用了 MQTT 插件的 RabbitMQ V3.6.10 实例。我有多个 Spring Boot 应用程序,Eclipse PAHO MQTT implementation 使用来自 RabbitMQ 队列的消息。所有消费者都使用MqttDefaultFilePersistence。持久性数据被写入具有 100M 配额的持久性卷上的目录/tmp/mqtt。这是我为连接到 RabbitMQ 队列而编写的代码:
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(false);
options.setAutomaticReconnect(reconnect);
MqttClient mqttClient = new MqttClient(serverUri, "MyConsumerApp", new MqttDefaultFilePersistence("/tmp/mqtt"));
mqttClient.connect(options);
现在我认识到,由于某些原因,如果重新启动 cosumer 应用程序,它不会再使用任何消息。正如我在 RabbitMQ 管理控制台中看到的那样,队列中的消息正在填满。 RabbitMQ 日志中没有错误消息。在这种情况下,唯一可行的解决方案是在 RabbitMQ 管理控制台中删除订阅队列并再次重新启动消费者应用程序。也许有人遇到过类似的问题?有没有人暗示我这里发生了什么?也许 RabbitMQ 使用的 MQTT 插件和保存消息状态的 PAHO 实现存在问题。看起来 RabbitMQ 不知道在消费者断开连接后从哪里恢复,所以它只是停止传递消息。有点悲观的锁定... :-)
【问题讨论】:
标签: java rabbitmq openshift mqtt