【问题标题】:how to publish MQTT messages concurrently using java client?如何使用 java 客户端同时发布 MQTT 消息?
【发布时间】:2019-09-26 01:51:56
【问题描述】:

我正在尝试使用 5 个 java 客户端同时发布 MQTT 消息,以便每个 java 客户端同时将特定主题的 1000 条消息发布到 MQTT 代理 (HIVEMQ)

我打开了多个线程,每个线程创建一个 mqtt 客户端并使用 ssl 连接到代理并尝试同时发布 1000 条消息,正在发送消息但所有连接都没有成功到代理,我不断收到异常

Client is not connected (32104)
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:199)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1355)
    at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:583)
    at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:575)
    at com.test.MqttPublishSample.publishMessages(MqttPublishSample.java:122)
    at com.test.MqttPublishSample.lambda$start$0(MqttPublishSample.java:74)
    at java.base/java.lang.Thread.run(Thread.java:834)
public class MqttPublishSample {

    public static void main(String... args) throws InterruptedException {

        new MqttPublishSample().start();

    }

  public void start() throws InterruptedException {


        for(int i=0;i<5;i++){

            new Thread(()->{
                MqttClient client = null;
                try {
                    client = obtainConnection();//code to obtain connection using MqttClient
                    publishMessages(client);//code to publish message using simple for loop 

                } catch (MqttException e) {
                    e.printStackTrace();
                }

            }).start();
        }
    }
public MqttClient obtainConnection() throws MqttException {
        String clientId = "sslTestClient"+ThreadLocalRandom.current().nextInt(0,5);
        MqttClient client = null;
        try {
            client = new MqttClient("ssl://localhost:8883", clientId, new MemoryPersistence());
        } catch (MqttException e) {
            e.printStackTrace();
        }

        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName("user1");
        mqttConnectOptions.setPassword("pass1".toCharArray());
        try {
            mqttConnectOptions.setSocketFactory(getTruststoreFactory());
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("connecting...");
        client.connect(mqttConnectOptions);
        return client;
    }

我希望所有客户端都能成功连接到代理并发布消息,无一例外

【问题讨论】:

  • 能发一下obtainConnection方法吗?
  • 更新了方法
  • 您可能达到了允许的最大飞行消息数,即 10,您可以使用 MqttConnectOptions.setMaxInflight 将其更改为更高的值。如果这没有帮助,可以通过打开日志从客户端获取更多有用信息来获得更多调试信息:wiki.eclipse.org/Paho/Log_and_Debug_in_the_Java_client

标签: java mqtt iot paho hivemq


【解决方案1】:

可能是您在线程上使用相同的 clientID,因此,服务器将断开重复。当您使用 LocalThreadRandom 时,有可能发生冲突(足够大,因为只有 5 个选择)。您可以使用 generateClientId() 提供的唯一标识符,或者在线程之间共享一个方法来跟踪它们。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-12-26
    • 1970-01-01
    • 2022-08-21
    相关资源
    最近更新 更多