MQTT 协议基本介绍   较详细,含EMQ搭建服务器

MQTT--入门

 

使用Eclipse创建Maven项目参考:使用eclipse创建Spring Boot项目

Spring提供了对多种消息中间件的整合,其中也包括MQTT。具体请参见以下链接:

https://docs.spring.io/spring-integration/reference/html/

Spring整合MQTT步骤如下:

1、创建Spring Boot Maven工程,poxm.xml引入如下依赖:

        <!-- mqtt -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

 

2、application.properties文件中增加MQTT配置参数

#MQTT Config
com.mqtt.url=tcp://mqttServerhost:1883
com.mqtt.inboundclientid=in_clientid
com.mqtt.outboundclientid=out_clientid
com.mqtt.topics=+/V1/T/+/topic

 

3、增加MQTT配置类

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import org.springframework.integration.mqtt.support.MqttHeaders;


@Configuration
public class MqttConfiguration {
    
    @Value("${com.mqtt.url}")
    private String url;

    @Value("${com.mqtt.topics}")
    private String topics;

    @Value("${com.mqtt.inboundclientid}")
    private String inclientid;

    @Value("${com.mqtt.outboundclientid}")
    private String outclientid;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(url);
        return factory;
    }

    /** InBound Begin 消息接收端 ****/
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        String clientid = inclientid + "_" + System.currentTimeMillis();
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(url, clientid);

        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        adapter.addTopic(topics);

        return adapter;
    }

    // ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MqttInboundMessageHandler();
//        return new MessageHandler() {
//            // 消息消费            
//            @Override
//            public void handleMessage(Message<?> message) throws MessagingException {
//                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
//                System.out.println(topic + ":收到消息 " + message.getPayload().toString());
//            }
//        };
    }

    /** InBound End ****/

    /** OutBound Begin 消息发送端 ****/

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /*****
     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
     * 
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        // 在这里进行mqttOutboundChannel的相关设置
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(outclientid, mqttClientFactory());
        messageHandler.setAsync(true); // 如果设置成true,发送消息时将不会阻塞。
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Component
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        // 定义重载方法,用于消息发送
        void sendToMqtt(String data);

        // 指定topic进行消息发送
        void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

        void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

    }
    /** OutBound End ****/

}
View Code

 

4、创建类MqttInboundMessageHandler实现MessageHandler用于处理MQTT消息  

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

public class MqttInboundMessageHandler implements MessageHandler {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
        //String topic = message.getHeaders().get("mqtt_topic").toString();  //低版本使用 mqtt_topic
        String payload = message.getPayload().toString();        
        //mqtt消息写入日志
        StringBuilder sb = new StringBuilder();
        sb.append("\r\n").append("topic:").append(topic).append("\r\n").append("payload:").append(payload);
        logger.info(sb.toString());
    }
}
View Code

相关文章:

  • 2022-12-23
  • 2021-08-28
  • 2021-05-04
  • 2021-08-22
  • 2022-12-23
  • 2021-11-21
  • 2021-10-02
  • 2021-09-30
猜你喜欢
  • 2022-12-23
  • 2021-07-10
  • 2021-05-16
  • 2021-10-15
  • 2022-12-23
  • 2022-12-23
  • 2021-04-12
相关资源
相似解决方案