【问题标题】:Cannot receive already published messages to subscribed topic on mqtt paho无法接收已发布的消息到 mqtt paho 上的订阅主题
【发布时间】:2014-04-30 14:31:11
【问题描述】:

我正在使用 paho 来发送和接收 mqtt 消息。到目前为止,发送消息没有问题。我收到它们时遇到问题。我的代码是:

     package BenchMQTT;

     import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
     import org.eclipse.paho.client.mqttv3.IMqttToken;
     import org.eclipse.paho.client.mqttv3.MqttCallback;
     import org.eclipse.paho.client.mqttv3.MqttException;
     import org.eclipse.paho.client.mqttv3.MqttMessage;
     import org.eclipse.paho.client.mqttv3.MqttClient;

     public class Test_A_2 implements MqttCallback {

     MqttClient clientR;
     MqttClient clientS;

     public Test_A_2() {
     }

     public static void main(String[] args) throws InterruptedException {
         long startTime = System.currentTimeMillis();
         new Test_A_2().doDemo();
         long endTime = System.currentTimeMillis();
     }

    public void doDemo() throws InterruptedException {
    try {   
    clientS = new MqttClient("tcp://mybroker:1883", "Sender");
    clientR = new MqttClient("tcp://mybroker:1883", "Reiever");
    clientR.connect();
    clientS.connect();
    MqttMessage message = new MqttMessage();

    String messagePayload = "qwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghjk"
            + "lzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghj"
            + "klzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfgh"
            + "jklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfg"
            + "hjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasd"
            + "fghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopas"
            + "dfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopa"
            + "sdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiop"
            + "asdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuio"
            + "pasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyui"
            + "opasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyu"
            + "iopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwerty"
            + "uiopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwert"
            + "nmqwertyuiop";

    clientR.subscribe("BenchMQTT");   
    clientR.setCallback(this);

    for(int i=0;i<10;i++)
    {
    message.setPayload((messagePayload)
            .getBytes());
    System.out.println(i);
    clientS.publish("BenchMQTT", message);
    }
    clientR.disconnect();   
    clientS.disconnect();
    clientR.close();   
    clientS.close();

   } catch (MqttException e)
    {
     System.out.println("ERROR");
    }
 }

     @Override
     public void connectionLost(Throwable cause) {
         // TODO Auto-generated method stub

     }

     @Override
     public void messageArrived(String topic, MqttMessage message)
     {
         System.out.println("Received: " + message.toString());
     }

     @Override
     public void deliveryComplete(IMqttDeliveryToken token) {

     }

     }

这发送和接收消息。

输出:

0
Received: 0
1
Received: 1
2
Received: 2
3
Received: 3
4
Received: 4
5
Received: 5
6
Received: 6
7
Received: 7
8
Received: 8
9
Received: 9

我想发送消息,然后接收它们。有什么帮助吗? 预期输出:

0
1
2
3
4
5
6
7
8
9
Received: 0
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9

【问题讨论】:

    标签: java mqtt paho


    【解决方案1】:

    这不是 MQTT(或任何发布/订阅消息传递)的工作方式,如果接收器连接到服务器,那么消息将在发送时传递。

    例外情况是,如果接收者连接并订阅了 QOS 大于 0 的主题,然后断开连接并稍后重新连接(没有设置干净会话标志),那么已发布的 QOS 大于 0 的丢失消息将在重新连接时交付。

    另一种可能性是,如果消息已在将保留标志设置为 true 的情况下发布,但只有发布到主题的最后一条消息将在接收客户端订阅时传递。

    【讨论】:

    • 在代理端,我有 10 条消息等待发送给某个订阅者。有没有办法接收它们?我只是在使用 paho mqtt 客户端时遇到了这个问题。对于其他客户,以后接收消息没有问题。有没有可能的情景?场景:发送方发送消息,发送完所有消息后,接收方将订阅和接收消息。
    • 在您编写的代码中,接收客户端订阅主题,然后再使用发布客户端发送消息。如果您将订阅移到 for 循环之后,您可能会更接近您想要的。
    • 更改:for(int i=0;i
    • 再看一遍,它永远不会像你描述的那样工作。 MQTT 代理不会对消息进行排队,除非已经以优于 QOS2 的速度订阅了断开连接的客户端。 (我还编辑了原始答案以涵盖保留的消息案例)
    • 所以。正如您所说,我应该首先使用 QOS2 连接并订阅它。在那之后断开连接。与发件人发送消息。随着消息的发送,我与接收者连接为: MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); clientR.connect(选项);之后,接收者将调用 setCallBack()。我对吗?我一步一步试了,还是不行。
    【解决方案2】:

    以下代码可以满足您的需求,但它会强制 MQTT 以不应该的方式运行。消息队列只是为了确保所有消息都传递给客户端,即使它断开一段时间,消息也将始终尽早传递。

     import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
     import org.eclipse.paho.client.mqttv3.IMqttToken;
     import org.eclipse.paho.client.mqttv3.MqttCallback;
     import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
     import org.eclipse.paho.client.mqttv3.MqttException;
     import org.eclipse.paho.client.mqttv3.MqttMessage;
     import org.eclipse.paho.client.mqttv3.MqttClient;
    
     public class Test_A_2 implements MqttCallback {
    
     MqttClient clientR;
     MqttClient clientS;
    
     public Test_A_2() {
     }
    
     public static void main(String[] args) throws InterruptedException {
         long startTime = System.currentTimeMillis();
         new Test_A_2().doDemo();
         long endTime = System.currentTimeMillis();
     }
    
    public void doDemo() throws InterruptedException {
    try {   
    
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
    
    clientS = new MqttClient("tcp://localhost:1883", "Sender");
    clientR = new MqttClient("tcp://localhost:1883", "Reiever");
    clientR.connect(options);
    clientS.connect();
    clientR.setCallback(this);
    clientR.subscribe("BenchMQTT",2);
    MqttMessage message = new MqttMessage();
    
    String messagePayload = "qwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghjk"
            + "lzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfghj"
            + "klzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfgh"
            + "jklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasdfg"
            + "hjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopasd"
            + "fghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopas"
            + "dfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiopa"
            + "sdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuiop"
            + "asdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyuio"
            + "pasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyui"
            + "opasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwertyu"
            + "iopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwerty"
            + "uiopasdfghjklzxcvbnmqwertyuiopasdfghjklzxcvbnmqwert"
            + "nmqwertyuiop";
    
    clientR.disconnect();
    
    for(int i=0;i<10;i++)
    {
    message.setPayload((messagePayload)
            .getBytes());
    System.out.println(i);
    message.setQos(2);
    clientS.publish("BenchMQTT", message);
    }
    
    
    
    clientR.connect(options);
    clientR.setCallback(this);
    clientR.subscribe("BenchMQTT",2);
    
    clientR.disconnect();   
    clientS.disconnect();
    clientR.close();   
    clientS.close();
    
    } catch (MqttException e)
    {
     System.out.println("ERROR");
     e.printStackTrace();
    }
    }
    
     @Override
     public void connectionLost(Throwable cause) {
         // TODO Auto-generated method stub
    
     }
    
     @Override
     public void messageArrived(String topic, MqttMessage message)
     {
         System.out.println("Received: " + message.toString());
     }
    
     @Override
     public void deliveryComplete(IMqttDeliveryToken token) {
    
     }
    
     }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-06-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多