【问题标题】:How to stop repeated subscribing of retained messages in spring mqtt integration once it is received收到消息后如何在spring mqtt集成中停止重复订阅保留消息
【发布时间】:2019-04-04 07:46:00
【问题描述】:

订阅保留主题时获取重复的保留消息。

我在我的物联网项目中使用了 spring mqtt 集成。 在这里,一旦收到保留的消息,它就会继续订阅,直到我将空白消息发布到同一主题,并将保留标志设置为 true。 我注意到当我在终端中使用 mqtt 命令执行相同的过程时,订阅保留的主题时,它只订阅一次,不会发生重复订阅。

我使用下面的代码通过#订阅所有主题

@Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    @Bean
    public DefaultMqttPahoClientFactory clientfactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        options.setCleanSession(false);
        //options.setCleanSession(true);
        //options.setServerURIs(new String[] { "tcp://localhost" });
        options.setServerURIs(new String[] { "url" });
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inbound() {

        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("admin",
                clientfactory(), "#");
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        /*adapter.setc*/
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")

    public MessageHandler handler() {
        return new MessageHandler() {

            public void handleMessage(Message<?> message) throws MessagingException {

            mqttSubscriptionProcessor.processSubscription(message);


            }

        };
    }

我使用此命令发布了保留消息

mosquitto_pub -u admin -P pwd -t hello/topic  -m "test msg" -r -d

在eclipse控制台中的结果是

{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg

{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg

{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg 

{mqtt_receivedRetained=true, id=48afaec5-debf-4927-ce06-a80556e479ac, mqtt_duplicate=false, mqtt_receivedTopic=hello/topic, mqtt_receivedQos=0, timestamp=1554363853214}
test msg

这里我只需要订阅一次保留的主题,必须更改spring集成代码中的任何更改。

【问题讨论】:

    标签: java spring spring-integration mqtt mosquitto


    【解决方案1】:

    这就是保留消息的工作方式,当客户订阅匹配的主题时,将保留位设置为主题的最后一条发布的消息将始终首先传递给客户端,然后再发送任何新消息。

    如果您不希望消息被保留(因此始终传递),则不要在发布时设置保留位。

    否则,正如您所发现的,您可以通过发布带有空负载并将保留位设置为同一主题的消息来清除主题的保留消息。

    或者您可以过滤客户端中的消息,因为您始终可以在消息传递时检查是否在消息上设置了保留标志。

    至于 Spring 方面,您似乎正在创建 4 个客户端,因此每个客户端都在订阅时接收消息。您可以通过查看代理日志来证明这一点,如果您以详细模式运行 mosquitto,它将显示它传递的每条消息。

    【讨论】:

    • 感谢您的回复。这里只有一个客户端连接到该主题的保留消息。我的意思是,每当客户端确实订阅了某个主题的保留消息时,它都不会停止一旦收到保留的消息就订阅。我不希望在收到保留消息后一次又一次地订阅它。有没有办法在不发送空有效负载的情况下停止这个过程?
    • 不,正如我所说,保留消息就是这样工作的,您是否不希望每次连接时都传递该消息,不要设置保留位
    • 但是如果我使用终端订阅它不会一次又一次地发生。我认为spring会在某些时间段连接和断开mqtt服务器。
    猜你喜欢
    • 1970-01-01
    • 2022-01-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-04-10
    • 1970-01-01
    • 2015-01-12
    • 1970-01-01
    相关资源
    最近更新 更多