【发布时间】:2018-07-04 17:37:55
【问题描述】:
由于 MQTT 的设计,您只能使用唯一的客户端 ID 建立连接,是否可以使用相同的连接在 Spring Framework/Boot 中使用集成进行发布和订阅?
以这个非常简单的例子,它会连接到MQTT代理来订阅和获取消息,但是如果你想发布一条消息,第一个连接会在消息发送后断开并重新连接。
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://localhost:1883");
factory.setUserName("guest");
factory.setPassword("guest");
return factory;
}
// publisher
@Bean
public IntegrationFlow mqttOutFlow() {
return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(p -> p + " sent to MQTT")
.handle(mqttOutbound())
.get();
}
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("siSampleTopic");
return messageHandler;
}
// consumer
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ", received from MQTT")
.handle(logger())
.get();
}
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
mqttClientFactory(), "siSampleTopic");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
如果您需要在发布消息后等待答案/结果,则使用 2 个单独的连接会变得很困难...
【问题讨论】:
-
您是在问是否可以创建到 MQTT 的双向连接?
-
是的,不知何故...请求已发布到主题“请求”。其中一位订阅者正在处理该请求。结果被发送回另一个主题“响应”。希望它是可以理解的?
-
不会撒谎,这听起来像一团糟。队列到队列模式的整个想法是解耦组件,进一步加强有保证的交付。相反,您想要做的是在 2 个队列之间创建依赖关系,此外,在适配器之间创建依赖关系。如果我真的诚实,我什至不认为这是可能的,或者至少我无法想象如何或为什么会这样做。对不起,伙计。
标签: java spring spring-boot spring-integration mqtt