MQTT 协议基本介绍 较详细,含EMQ搭建服务器
使用Eclipse创建Maven项目参考:使用eclipse创建Spring Boot项目
Spring提供了对多种消息中间件的整合,其中也包括MQTT。具体请参见以下链接:
https://docs.spring.io/spring-integration/reference/html/
Spring整合MQTT步骤如下:
1、创建Spring Boot Maven工程,poxm.xml引入如下依赖:
<!-- mqtt -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2、application.properties文件中增加MQTT配置参数
#MQTT Config com.mqtt.url=tcp://mqttServerhost:1883 com.mqtt.inboundclientid=in_clientid com.mqtt.outboundclientid=out_clientid com.mqtt.topics=+/V1/T/+/topic
3、增加MQTT配置类
import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;import org.springframework.integration.mqtt.support.MqttHeaders; @Configuration public class MqttConfiguration { @Value("${com.mqtt.url}") private String url; @Value("${com.mqtt.topics}") private String topics; @Value("${com.mqtt.inboundclientid}") private String inclientid; @Value("${com.mqtt.outboundclientid}") private String outclientid; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs(url); return factory; } /** InBound Begin 消息接收端 ****/ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { String clientid = inclientid + "_" + System.currentTimeMillis(); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(url, clientid); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); adapter.addTopic(topics); return adapter; } // ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel。 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MqttInboundMessageHandler(); // return new MessageHandler() { // // 消息消费 // @Override // public void handleMessage(Message<?> message) throws MessagingException { // String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); // System.out.println(topic + ":收到消息 " + message.getPayload().toString()); // } // }; } /** InBound End ****/ /** OutBound Begin 消息发送端 ****/ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /***** * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory * * @return */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { // 在这里进行mqttOutboundChannel的相关设置 MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(outclientid, mqttClientFactory()); messageHandler.setAsync(true); // 如果设置成true,发送消息时将不会阻塞。 messageHandler.setDefaultTopic("testTopic"); return messageHandler; } @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MyGateway { // 定义重载方法,用于消息发送 void sendToMqtt(String data); // 指定topic进行消息发送 void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); } /** OutBound End ****/ }
4、创建类MqttInboundMessageHandler实现MessageHandler用于处理MQTT消息
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; public class MqttInboundMessageHandler implements MessageHandler { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); //String topic = message.getHeaders().get("mqtt_topic").toString(); //低版本使用 mqtt_topic String payload = message.getPayload().toString(); //mqtt消息写入日志 StringBuilder sb = new StringBuilder(); sb.append("\r\n").append("topic:").append(topic).append("\r\n").append("payload:").append(payload); logger.info(sb.toString()); } }