【问题标题】:Understanding spring cloud messaging with rabbitmq用 rabbitmq 理解 Spring Cloud 消息传递
【发布时间】:2018-08-18 11:09:12
【问题描述】:

我认为我在理解 Spring Cloud 消息传递方面存在问题,并且无法找到我面临的“问题”的答案。

我有以下设置(使用 spring-boot 2.0.3.RELEASE)。

application.yml

spring:
    rabbitmq:
      host: localhost
      port: 5672
      username: guest
      password: guest
      virtual-host: /
    cloud:
      stream:
        bindings:
          input:
            destination: foo
            group: fooGroup
          fooChannel:
            destination: foo

服务类

@Autowired
FoodOrderController foodOrderController;

@Bean
public CommandLineRunner runner() {
    return (String[] args) -> {
       IntStream.range(0,50).forEach(e -> foodOrderController.orderFood());
    };
}

@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals(String meal){
    System.out.println("This was a great meal!: "+ meal);
}

@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals1(String meal){
    System.out.println("This was a great meal!: "+ meal);
}

食品订单控制器

public class FoodOrderController {

    @Autowired
    FoodOrderSource foodOrderSource;

    public String orderFood(){
        var foodOrder = new FoodOrder();
        foodOrder.setCustomerAddress(UUID.randomUUID().toString());
        foodOrder.setOrderDescription(UUID.randomUUID().toString());
        foodOrder.setRestaurant("foo");
        foodOrderSource.foodOrders().send(MessageBuilder.withPayload(foodOrder).build());
       // System.out.println(foodOrder.toString());
        return "food ordered!";
    }
}

FoodOrderSource

public interface FoodOrderSource {
    String INPUT = "foo";
    String OUTPUT = "fooChannel";

    @Input("foo")
    SubscribableChannel foo();
    @Output("fooChannel")
    MessageChannel foodOrders();
}

FoodOrderPublisher

@EnableBinding(FoodOrderSource.class)
public class FoodOrderPublisher {
}

设置工作正常,但StreamListener 都收到相同的消息。所以一切都被记录了两次。阅读文档,它说在队列绑定中指定group,两个侦听器都将在组内注册,并且只有一个侦听器会收到一条消息。我知道上面的示例不合理,但我想模拟具有多个侦听器设置的多节点环境。

为什么两个听众都收到了消息?以及如何确保在设置组中只收到一次消息?

根据文档,默认情况下消息也应该是自动确认的,但我找不到任何表明消息实际上得到确认的东西。我在这里遗漏了什么吗?

这里有一些rabbit admin的截图

【问题讨论】:

    标签: java spring spring-boot rabbitmq spring-messaging


    【解决方案1】:

    阅读文档,它说在队列绑定中指定一个组,两个侦听器都将在组内注册,并且只有一个侦听器会收到一条消息。

    当侦听器位于不同的应用程序实例中时,情况确实如此。当同一实例中有多个侦听器时,它们都会收到相同的消息。这通常与condition 一起使用,每个听众都可以表达对他们感兴趣的餐点的兴趣。Documented here

    基本上,竞争消费者是绑定本身,它将消息分派到应用程序中的实际@StreamListeners。

    因此,您不能以这种方式“模拟具有多个侦听器设置的多节点环境”。

    但我找不到任何表明消息确实得到确认的信息

    你这是什么意思?如果消息处理成功,容器会确认消息并将其从队列中移除。

    【讨论】:

    • 谢谢!我已经创建了另一个 spring boot 应用程序,像第一个实例一样设置它,一次又一次地启动它们,所有消息都记录在两个实例中。你能确认我在 application.yml 中的配置是正确的吗?
    • 看起来不错;您是否也将ApplicationRunner 添加到两个实例中?这会使消息加倍。您可以在 RabbitMQ Admin UI (http://localhost:15672) 上查看队列 @9​​87654326@ 及其消费者。
    • 我添加了一些图片,我没有看到组和队列之间正在创建连接,是吗? foo exchange 似乎有两个队列,但我没有连接到该组。我没有将 ApplicationRunner 添加到第二个应用程序中,它只在第一个应用程序中运行。
    • 奇怪 - 你有 2 个匿名队列,这意味着 group 属性由于某种原因没有被应用。队列正确绑定交换foo;不应该有交流fooGroup。应该有一个队列foo.fooGroup,两个消费者都连接到该队列,队列绑定到foo。如果您无法弄清楚,请将您的项目发布到 github 之类的地方;我可以看看有什么问题。但我今天只会在很短的时间里,所以可能是今晚或明天 (EDT)。
    • 啊——这是因为@Input("foo")——你需要bindings: foo:而不是bindings: input:。抱歉,我没有注意到绑定名称不匹配。如果foo 没有显式绑定,则使用默认值(目标 = foo,匿名消费者)。
    【解决方案2】:

    正确答案已经在帖子中回复了,但您仍然可以查看:

    https://github.com/jinternals/spring-cloud-stream

    【讨论】:

    • 谢谢,这看起来很有帮助,可以解决我的下一个挑战!
    • 如果您觉得 repo 有帮助,请点赞,祝您编码愉快 :)
    猜你喜欢
    • 1970-01-01
    • 2015-03-23
    • 2014-02-17
    • 2015-10-02
    • 2018-10-09
    • 2021-03-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多