【问题标题】:Message MQTT not receive in subscribe client订阅客户端未收到消息 MQTT
【发布时间】:2016-05-26 09:28:57
【问题描述】:

我正在研究 MQTT 协议,我尝试使用 2 个不同的 Java 应用程序发布和订阅。

我的第一个应用程序是“发布”。我在 MQTT 服务器上发布消息。

我的第二个申请是“订阅”。我订阅了一个主题并尝试接收消息。但是我从来没有收到过消息。

当我运行这 2 个应用程序时,我先从“订阅”应用程序开始,然后在运行“发布”应用程序之后。当“发布”应用程序开始时,我失去了与“订阅”应用程序的连接并且我无法接收到我的消息。

在“订阅”应用程序中,client.setCallback(this) 永远不会调用我的方法 messageArrived()。 (见下面的代码)。

这是我的 2 代码应用程序:

发布应用程序: 类 PubClient :

package publishclient;

import java.util.Random;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;



public class PubClient implements MqttCallback {

MqttClient client;
MqttConnectOptions connOpt; 

Random rand = new Random(); 
int nbRandom = 0;
int valMax =151, valMin = 40;

public PubClient() throws MqttException {
String broker = "tcp://88.177.147.17:1883"; // Adress MQTT Server
String clientId = "0bdd-4445-82f3-928f8ddb1887"; // ClientID
String topic1f = "ApplicationRio/capteur"; // Topic
int QoSserveur = 2;

        try {
            String uuid = "ac8da3c6-0bdd-4445-82f3-928f8ddb3294";

            MemoryPersistence persistence = new MemoryPersistence();   

            // Create 2 objects : client and connOpt
            client = new MqttClient(broker, clientId, persistence);
            connOpt = new MqttConnectOptions();

            connOpt.setCleanSession(true);

            client.setCallback(this);

             // Connection to MQTT server
            System.out.println("Connexion a : " + broker + " Publisher");
            client.connect(connOpt);  

            //Create random number for my message
            nbRandom = valMin + rand.nextInt(valMax-valMin);
            System.out.println("nb aleatoire = " + nbRandom);
            String messageAEnvoyer = uuid + "//" + nbRandom;
            System.out.println("Message a envoyer : " + messageAEnvoyer);

            MqttMessage message = new MqttMessage();
            message.setPayload(messageAEnvoyer.getBytes());
            message.setQos(QoSserveur);
            client.publish(topic1f, message);

        } catch(MqttException e) {
            e.printStackTrace();
        }
}

    @Override
    public void connectionLost(Throwable thrwbl) {System.out.println("Perdue connexion");}

    @Override
    public void messageArrived(String string, MqttMessage mm) throws Exception {
    System.out.println("Message recu est : "+ new String(mm.getPayload()));}

    @Override
    public void deliveryComplete(IMqttDeliveryToken imdt) {
    System.out.println("Message delivre au broker");
    }
}

主要(发布):

package publishclient;

import org.eclipse.paho.client.mqttv3.MqttException;

public class PublishClient {

public static void main(String[] args) throws MqttException {     
    PubClient publieur = new PubClient();
}

“订阅”应用程序: 类子客户端:

package subscribeclient;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

public class SubClient implements MqttCallback {

MqttClient clientsub; 
MqttConnectOptions connOpt; 


public SubClient() throws MqttException{
String broker = "tcp://88.177.147.17:1883"; // Adress MQTT Server
String clientId = "0bdd-4445-82f3-928f8ddb1887"; // ClientID
String topic1f = "ApplicationRio/capteur"; // Topic
int QoSserveur = 2;

try{

            // Create 2 objects : client and connOpt
            clientsub = new MqttClient(broker, clientId);
            connOpt = new MqttConnectOptions();

            connOpt.setCleanSession(false);
            connOpt.setKeepAliveInterval(30);

            clientsub.setCallback(this);

             // Connection to MQTT Server
            System.out.println("Connexion a : " + broker + " Subscriber");
            clientsub.connect(connOpt);


            clientsub.subscribe(topic1f,QoSserveur);

            } catch(MqttException e){
            e.printStackTrace();
        }
}

    @Override
    public void connectionLost(Throwable thrwbl) {
        System.out.println("Connexion perdue");
    }

    @Override
    public void messageArrived(String string, MqttMessage message) throws Exception {
        System.out.println("Le message recu est : " + new String(message.getPayload()));  
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken imdt) {
        System.out.println("Message arrive");
    }
}

主(订阅):

package subscribeclient;

import org.eclipse.paho.client.mqttv3.MqttException;

public class SubscribeClient {

public static void main(String[] args) throws MqttException {
    SubClient subscriber = new SubClient();
}

}

我的 2 个应用程序需要同时运行,而且我不需要断开连接,因为我一直在运行应用程序。

那么,您知道为什么在我运行“发布客户端”时我的“订阅客户端”会断开连接以及为什么我无法在“订阅消息”中收到我的消息吗?

我使用 org.eclipse.paho.client.mqttv3-1.0.2.jar 作为 MQTT 库。

【问题讨论】:

    标签: java server connection publish-subscribe mqtt


    【解决方案1】:

    客户端 ID 在所有客户端之间必须是唯一的。您为发布者和订阅者使用了相同的客户端 ID,因此代理将在发布者连接时启动订阅者。

    【讨论】:

    • 太好了,我已经更改了 2 个应用程序的 ClientID,并且我输入了不同的 clientID,它正在工作!感谢您解决了我的问题!