【问题标题】:Offline messages are not consumed in Moquette with Paho Client在带有 Paho 客户端的 Moquette 中不使用离线消息
【发布时间】:2018-08-15 12:16:00
【问题描述】:

我有一个关于通过 Eclipse Paho 客户端在 Moquette 服务器中使用离线 MQTT 消息的问题。

以下是我遵循的步骤。

  1. 创建并启动了 Moquette MQTT 代理。
  2. 使用 Eclipse Paho 客户端创建了一个简单的 MQTT 消费者应用程序。
  3. 将消费者设置为使用主题上的数据:“devices/reported/#”,QOS:1 和 CleanSession:False
  4. 创建了一个简单的 MQTT 数据发布者,以使用 Eclipse Paho 将数据发布到 MQTT 代理。
  5. 使用 MQTT 数据发布者将消息发布到:“devices/reported/client_1”主题,QOS:1

以上步骤成功,没有任何问题。

然后我停止了我的消费者应用程序并将 MQTT 数据发送到具有相同主题的代理。使用我的发布者应用程序 - 服务器能够接收这些消息,但此时没有任何消费者可以使用此消息,因为我已经停止了我的消费者。 然后我再次启动了我的消费者应用程序。它已成功连接到代理,但是在消费者关闭时它没有收到我发送给代理的任何消息。

我是否需要对我的 Moquette 服务器进行任何特定配置以保存数据(使用干净的会话:false)? 还是我错过了什么?

请在下面找到我的示例代码,

Moquette 服务器初始化

package com.gbids.mqtt.moquette.main;

import com.gbids.mqtt.moquette.server.PublishInterceptor;
import io.moquette.interception.InterceptHandler;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class ServerLauncher {

    public static void main(String[] args) throws IOException {
        Properties props = new Properties();
        final IConfig configs = new MemoryConfig(props);

        final Server mqttBroker = new Server();
        final List<? extends InterceptHandler> userHandlers = Arrays.asList(new PublishInterceptor());
        mqttBroker.startServer(configs, userHandlers);

        System.out.println("moquette mqtt broker started, press ctrl-c to shutdown..");
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("stopping moquette mqtt broker..");
                mqttBroker.stopServer();
                System.out.println("moquette mqtt broker stopped");
            }
        });
    }
}

MQTT 消费者

package com.gbids.mqtt.moquette.main;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ConsumerLauncher implements MqttCallback {

    private static final String topicPrefix = "devices/reported";
    private static final String broker = "tcp://0.0.0.0:1883";
    private static final String clientIdPrefix = "consumer";

    public static void main(String[] args) throws MqttException {
        final String clientId = "consumer_1";
        MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setCleanSession(false);
        sampleClient.connect(connOpts);
        sampleClient.subscribe(topicPrefix + "/#", 1);
        sampleClient.setCallback(new ConsumerLauncher());
    }

    public void connectionLost(Throwable throwable) {
        System.out.println("Consumer connection lost : " + throwable.getMessage());
    }

    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
        System.out.println("Message arrived from topic : " + s + " | Message : " + new String(mqttMessage.getPayload()) + " | Message ID : " +mqttMessage.getId());
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("Delivery completed from : " + clientIdPrefix + "_1");
    }
}

MQTT 发布者

package com.gbids.mqtt.moquette.main;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class ClientLauncher {

    private static final String content = "{\"randomData\": 25}";
    private static final String willContent = "Client disconnected unexpectedly";
    private static final String broker = "tcp://0.0.0.0:1883";
    private static final String clientIdPrefix = "client";

    public static void main(String[] args) throws Exception{
        sendDataWithQOSOne();
        System.exit(0);
    }

    private static void sendDataWithQOSOne(){
        try {
            final String clientId = "client_1";
            MqttClient sampleClient = new MqttClient(broker, clientId, new MemoryPersistence());
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(false); // for publisher - this is not needed I think
            sampleClient.connect(connOpts);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(1);
            final String topic = "devices/reported/" + clientId;
            sampleClient.publish(topic, message);
            System.out.println("Message published from : " + clientId + " with payload of : " + content);
            sampleClient.disconnect();
        } catch (MqttException me) {
            me.printStackTrace();
        }
    }
}

【问题讨论】:

    标签: java mqtt paho moquette


    【解决方案1】:

    在您的情况下,您需要在ClientLauncher(发布者)中创建MqttMessage 时将retained 标志设置为true。默认值为false,如documentation

    ...    
    message.setRetained(true)
    ...
    

    设置此标志可让消息保留在代理上并发送到新连接的客户端。请注意,代理只保留主题的最后一条消息。没有办法为特定主题保留多条消息。

    【讨论】:

    • 设置保留标志只保留给定主题上发布的最后一条消息(设置了保留标志)。它不会为之前已订阅主题的离线客户端排队消息。
    • @hardlib 是的,当然它只保留最后一条消息。 OP 并未声明应保留所有消息。如果是这种情况,无论如何 MQTT 都是错误的选择,因为 MQTT 不是为此而设计的。 Broker 可以理解为一种状态数据库。 (编辑了尊重这一点的答案)
    猜你喜欢
    • 2016-06-28
    • 1970-01-01
    • 2016-02-28
    • 2018-10-18
    • 1970-01-01
    • 2016-06-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多