【问题标题】:Publish & Subscribe with Same Connection using Spring Integration MQTT使用 Spring Integration MQTT 以相同的连接发布和订阅
【发布时间】:2018-07-04 17:37:55
【问题描述】:

由于 MQTT 的设计,您只能使用唯一的客户端 ID 建立连接,是否可以使用相同的连接在 Spring Framework/Boot 中使用集成进行发布和订阅?

以这个非常简单的例子,它会连接到MQTT代理来订阅和获取消息,但是如果你想发布一条消息,第一个连接会在消息发送后断开并重新连接。

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setServerURIs("tcp://localhost:1883");
    factory.setUserName("guest");
    factory.setPassword("guest");
    return factory;
}

// publisher

@Bean
public IntegrationFlow mqttOutFlow() {
    return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
                    e -> e.poller(Pollers.fixedDelay(1000)))
            .transform(p -> p + " sent to MQTT")
            .handle(mqttOutbound())
            .get();
}

@Bean
public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic("siSampleTopic");
    return messageHandler;
}

// consumer

@Bean
public IntegrationFlow mqttInFlow() {
    return IntegrationFlows.from(mqttInbound())
            .transform(p -> p + ", received from MQTT")
            .handle(logger())
            .get();
}

private LoggingHandler logger() {
    LoggingHandler loggingHandler = new LoggingHandler("INFO");
    loggingHandler.setLoggerName("siSample");
    return loggingHandler;
}

@Bean
public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
            mqttClientFactory(), "siSampleTopic");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
}

如果您需要在发布消息后等待答案/结果,则使用 2 个单独的连接会变得很困难...

【问题讨论】:

  • 您是在问是否可以创建到 MQTT 的双向连接?
  • 是的,不知何故...请求已发布到主题“请求”。其中一位订阅者正在处理该请求。结果被发送回另一个主题“响应”。希望它是可以理解的?
  • 不会撒谎,这听起来像一团糟。队列到队列模式的整个想法是解耦组件,进一步加强有保证的交付。相反,您想要做的是在 2 个队列之间创建依赖关系,此外,在适配器之间创建依赖关系。如果我真的诚实,我什至不认为这是可能的,或者至少我无法想象如何或为什么会这样做。对不起,伙计。

标签: java spring spring-boot spring-integration mqtt


【解决方案1】:

TL;DR

答案是否定的,当前的 Spring Boot MQTT 集成实现不会(甚至未来的实现也可能不会)。

回答

我面临着同样的情况:我需要一个 MQTT 客户端在入站和出站中都打开,使连接持久并共享相同的配置(客户端 ID、凭据等),使用 Spring Integration Flows 作为尽可能接近设计。

为了实现这一点,我不得不重新实现 MqttPahoMessageDrivenChannelAdapterMqttPahoMessageHandler 以及一个客户端工厂。

MqttPahoMessageDrivenChannelAdapterMqttPahoMessageHandler 中,我必须选择使用异步的(IMqttAsyncClient)来修复使用哪一个。然后我必须查看调用/使用客户端实例的部分代码,以检查它是否已被其他流实例化并检查状态(例如,如果它已经连接,则不要尝试连接它)。

关于客户端工厂,它更容易:我重新实现了getAsyncClientInstance(String url, String clientId),使用urlclientId 的串联作为哈希键,将实例存储到用于检索现有实例的映射中如果其他流程请求它。

它以某种方式起作用,但这只是一个测试,我什至不确定这是一个好方法。 (I've started another StackOverflow question in order to track my specific scenario)。

你能分享一下你是如何处理你的情况的吗?

【讨论】:

  • 由于我无法使用 Spring Integration 真正做到这一点,我只是手动创建了一个 MqttClient 来发送消息。为了接收部分,我转移到了一个外部进程。这不是我真正想要的,但有时你只需要让它工作!
  • @bwillemo:我完全理解你的意思,为了解决问题,我必须实现你的相同方式,但我不想放弃“春天方式”,这就是为什么我即使您的问题已超过一年,也问过您。还是谢谢。
【解决方案2】:

第一个连接会断开,消息发送后会重新连接。

不知道你的意思;两个组件都将保持打开一个持久连接。

由于工厂不连接客户端,适配器连接,它不是为使用共享客户端而设计的。

使用单个连接并不能真正帮助协调请求/回复,因为回复仍会在另一个线程上异步返回。

如果您在请求/回复中有一些数据可用于关联对请求的回复,您可以使用BarrierMessageHandler 来执行该任务。示例见my answer here;它使用标准的相关 id 标头,但 MQTT 无法做到这一点,您需要在消息中添加一些内容。

【讨论】:

  • 嗨,加里!不确定我是否理解正确?您是说我不能有两个集成流,一个出站(发布消息)和一个入站(接收消息)用于相同的客户端 ID 和凭据/证书?因为一个物联网设备有两个不同的客户端(一个发布,另一个订阅)是没有意义的,对吧?谢谢
  • 我不确定你的意思;您不直接发布连接到代理的设备;由于这个答案太老了,我建议您提出一个新问题,详细说明您要达到的目标。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-04-11
  • 1970-01-01
  • 2018-01-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多