【问题标题】:Receive message when consumer is up in spring-cloud-stream app当消费者在 spring-cloud-stream 应用程序中启动时接收消息
【发布时间】:2018-10-14 14:18:20
【问题描述】:

我正在玩 Spring-cloud-stream 和 RabbitMQ。

我有一个产生消息的 REST 端点。

@SpringBootApplication
@EnableBinding(MyProcessor.class)
public class ProducerDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerDemoApplication.class, args);
    }
}

@RestController
class ProducerController {

    @Autowired
    MyProcessor myProcessor;

    @RequestMapping(value = "sendmessage/{message}", method = RequestMethod.GET)
    public String sendMessage(@PathVariable("message") String message) {
        myProcessor.anOutput().send(MessageBuilder.withPayload(message).build());
        return "sent";  
    }

}

interface MyProcessor {
    String INPUT = "myInput";

    @Output("myOutput")
    MessageChannel anOutput();
}

通过另一个应用程序,我正在使用这些消息。

@StreamListener(MyProcessor.INPUT)
public void eventHandler(String message) {
    System.out.println("**************  Message received => "+message);
}

当两个应用程序都已启动并正在运行时。我能够发布消息并在消费者处使用它。

我在以下场景中面临的问题:

我故意让消费者失望并通过生产者发布消息。现在,当消费者启动时,它没有收到任何消息。

我想 RabbitMQ 保证消息传递。

Github 链接
https://github.com/govi20/producer-demo
https://github.com/govi20/consumer-demo

【问题讨论】:

  • 你能把你的应用发布到 Github 上让我们看看吗?您在上面提供的内容中缺少某些内容。例如String INPUT = "myInput"; - 我没看到和@Input 配置等
  • @OlegZhurakousky 我在原始问题中添加了 GitHub 链接。

标签: rabbitmq spring-cloud-stream


【解决方案1】:

正如我之前提到的,您已经在“myInput”中配置了错误配置,因为您没有 @Input 配置,这会导致消费者启动期间出现 A component required a bean named 'myInput' that could not be found. 错误。 因此,在消费者方面需要这样的东西

 interface MyProcessor {
    String INPUT = "myInput";

    @Input("myInput")
    MessageChannel myInput();
}

如果您没有定义group,则更多信息会导致 Rabbit 端出现匿名队列(类似于 myInput.anonymous.pZg03h0zQ2-SHLh1_QL8DQ),这实际上会导致每次启动时队列的名称都不同,所以

spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=myGroup

将导致队列名称myInput.myGroup 在启动之间保持一致。

另外,在生产者端myOutput 会导致创建一个没有路由到上述(或任何其他)队列的 Rabbit Exchange,因此 Rabbit 会丢弃消息,因此您不可能接收来自生产者,直到您在 myOutput 交换和 myInput.myGroup 队列之间创建路由。 但是,如果您按照我上面描述的方式配置输入,spring-cloud-stream 还将创建一个名为 myInput 的交换,该交换将自动路由到 myInput.myGroup,因此如果您将生产者更改为发送到该目的地,您将收到消费者的消息。

【讨论】:

    【解决方案2】:

    您需要一个关于消费者输入绑定的组。否则它是匿名的并绑定一个自动删除队列,该队列仅在消费者运行时存在。

    【讨论】:

    • 我不知道你的意思,它似乎与这个问题无关。这里的管理员不喜欢扩展评论。如果您还有其他需要,请提出新问题。
    • 另外,请随意浏览我们提供的大量示例库 github.com/spring-cloud/spring-cloud-stream-samples
    • @OlegZhurakousky 感谢分享链接。
    猜你喜欢
    • 2021-12-30
    • 2020-03-28
    • 1970-01-01
    • 2018-01-04
    • 2021-03-26
    • 2018-05-13
    • 1970-01-01
    • 2016-06-21
    • 2017-06-22
    相关资源
    最近更新 更多