【问题标题】:Proper MQTT subscription code that maintains persistence维护持久性的正确 MQTT 订阅代码
【发布时间】:2023-04-01 16:30:01
【问题描述】:

我正在寻找订阅给定主题的 MQTT 客户端的 java 代码,在该主题上发布的每条消息都应该只到达客户端一次。我编写了许多代码,并且在所有情况下消息都正确传递给客户端当它连接到代理时,但如果订阅的客户端与代理断开连接一段时间然后再次连接回来,它不会收到在未连接期间发送的消息,并且我已设置干净会话标志也为假但仍然无法正常工作,下面给出了我使用的代码

import org.fusesource.hawtbuf.*;
import org.fusesource.mqtt.client.*;

/**
 * Uses an callback based interface to MQTT.  Callback based interfaces
 * are harder to use but are slightly more efficient.
 */
class Listener {

    public static void main(String []args) throws Exception {

        String user = env("APOLLO_USER", "admin");
        String password = env("APOLLO_PASSWORD", "password");
        String host = env("APOLLO_HOST", "localhost");
        int port = Integer.parseInt(env("APOLLO_PORT", "61613"));
        final String destination = arg(args, 1, "subject");


        MQTT mqtt = new MQTT();
        mqtt.setHost(host, port);
        mqtt.setUserName(user);
        mqtt.setPassword(password);
    mqtt.setCleanSession(false);
    mqtt.setClientId("newclient");

        final CallbackConnection connection = mqtt.callbackConnection();
        connection.listener(new org.fusesource.mqtt.client.Listener() {
            long count = 0;
            long start = System.currentTimeMillis();

            public void onConnected() {
            }
            public void onDisconnected() {
            }
            public void onFailure(Throwable value) {
                value.printStackTrace();
                System.exit(-2);
            }
            public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {
                System.out.println("Nisha Messages : " + msg);
                System.out.println("Nisha topic" + topic);
                System.out.println("Nisha Receive acknowledgement : " + ack);
                String body = msg.utf8().toString();
                if("SHUTDOWN".equals(body)) {
                    long diff = System.currentTimeMillis() - start;
                    System.out.println(String.format("Received %d in %.2f seconds", count, (1.0*diff/1000.0)));
                    connection.disconnect(new Callback<Void>() {
                        @Override
                        public void onSuccess(Void value) {
                            System.exit(0);
                        }
                        @Override
                        public void onFailure(Throwable value) {
                            value.printStackTrace();
                            System.exit(-2);
                        }
                    });
                } else {
                    if( count == 0 ) {
                        start = System.currentTimeMillis();
                    }
                    if( count % 1000 == 0 ) {
                        System.out.println(String.format("Received %d messages.", count));
                    }
                    count ++;
                }
            }
        });
        connection.connect(new Callback<Void>() {
            @Override
            public void onSuccess(Void value) {
                System.out.println("connected in :::: ");
                Topic[] topics = {new Topic(destination, QoS.AT_MOST_ONCE)};
                connection.subscribe(topics, new Callback<byte[]>() {
                    public void onSuccess(byte[] qoses) {
                    }
                    public void onFailure(Throwable value) {
                        value.printStackTrace();
                        System.exit(-2);
                    }
                });
            }
            @Override
            public void onFailure(Throwable value) {
                value.printStackTrace();
                System.exit(-2);
            }
        });

        // Wait forever..
        synchronized (Listener.class) {
            while(true)
                Listener.class.wait();
        }
    }

    private static String env(String key, String defaultValue) {
        String rc = System.getenv(key);
        if( rc== null )
            return defaultValue;
        return rc;
    }

    private static String arg(String []args, int index, String defaultValue) {
        if( index < args.length )
            return args[index];
        else
            return defaultValue;
    }
}

我在这里做错了吗?

【问题讨论】:

    标签: java mqtt subscribe


    【解决方案1】:

    它不接收在它未连接期间发送的消息

    MQTT 保留所有消息。如果客户端下线,未传递的消息将丢失。保留机制仅保留发布到主题的最后消息。

    你可以read more in the specs3.3.1.3 RETAIN

    【讨论】:

    • 如果客户端连接时将“干净会话”设置为 false,则 qos 1/2 消息应在其离线时排队。
    • @knolleary 你有这方面的参考吗? AFAIK 'clean session' 只保留对主题的订阅,而不是消息(参见:www-01.ibm.com/support/knowledgecenter/SSFKSJ_7.1.0/…
    • 参见标准中的一致性声明MQTT-3.1.2-5:docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/…
    • 谢谢你们的回复。我找到了答案,如果 clean_session 设置为 false,mqtt 实际上会在客户端未连接时保留所有消息,并且我找到了一种运行上述程序的方法,以便我的任何消息都没有丢失。
    • 如果 QOS 级别设置为 1 或 2 并且在干净会话标志设置为 false 的情况下完成重新连接,MQTT 将能够将断开连接期间发布的消息传递给订阅者。更高级别的 QoS 更可靠,但涉及更高的延迟和更高的带宽要求。 0:代理/客户端将发送一次消息,无需确认。 1:代理/客户端将至少传递一次消息,需要确认。 2:代理/客户端将通过四步握手将消息传递一次。 mosquitto.org/man/mqtt-7.html
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-01-14
    • 2013-12-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-11
    相关资源
    最近更新 更多