【问题标题】:Bind RabbitMQ consumer using Spring Cloud Stream to an existing queue使用 Spring Cloud Stream 将 RabbitMQ 消费者绑定到现有队列
【发布时间】:2017-04-28 13:52:16
【问题描述】:

我使用 RabbitMQ web-UI 创建了一个主题交换 TX 并绑定到交换两个队列 TX.Q1TX.Q2,每个都与路由键 rk1rk2 相应地绑定,并产生很少的消息到交换。

现在我想使用 Spring Cloud Stream 创建一个仅从 Q1 获取消息的消费者。 我尝试使用配置:

spring.cloud.stream.bindings.input.destination=TX
spring.cloud.stream.bindings.input.group=Q1

以及消费消息的方法的注解@StreamListner(Sink.INPUT)

结果我可以看到消费者创建了一个具有相同名称的队列(或绑定)TX.Q1,但新队列/绑定的 Routing-Key 是 #.
如何通过 Spring Cloud Stream 配置将使用来自预定义队列的消息的消费者(仅使用 rk1 路由的消息)。

【问题讨论】:

    标签: java spring rabbitmq spring-cloud-stream


    【解决方案1】:

    所以现在,Garry Russell 建议的解决方法已经为我解决了这个问题。

    我以这种方式使用@RabbitListener 而不是@StreamListenet
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "TX.Q1", durable = "true"), exchange = @Exchange(value = "TX", type = "topic", durable = "true"), key = "rk1")

    因此,预定义队列 TX.Q1 使用绑定密钥 rk1 绑定到交换 TX

    等待Spring Cloud Steream issue 的更新。

    【讨论】:

    • 这个解决方法已经有一段时间了,我想知道您是否提出了一个更干净的解决方案(例如,使用属性而不是注释)。
    【解决方案2】:

    我想我使用@StreamListener 找到了解决方案,而不是使用解决方法。一切都是在配置中完成的,而不是在代码中。

    我使用的配置如下(在 .yml 中,但您可以在 .properties 中轻松翻译):

    spring:
      cloud:
        stream:
          bindings:
            input:
              binder: <binder_name>
              destination: TX
              group: Q1
          binders:
            <binder_name>:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    host: <host>
                    port: <port>
                    virtual-host: <vhost>
                    username: <username>
                    password: <password>
          rabbit:
            bindings:
              input:
                consumer:
                  binding-routing-key: rk1
                  exchange-name: TX
                  queue-name-group-only: true
                  bind-queue: true
                  exchange-durable: true
                  exchange-type: topic
    

    使用这种方法,您不必编写特定代码来让 RabbitMQ 使用者连接到您的集群,这应该可以解决您的问题。

    希望这会有所帮助。

    【讨论】:

    • 它没有按预期工作。例如,我确实提供了上述设置,但每次重新启动应用程序时,我注意到在 RabbitMQ 中使用路由键 # 创建了一个新绑定——即使我已经定义了另一个路由键。因此,每当我发送消息时,即使没有路由密钥 - 我不希望我的听众收到该消息 - 但是,由于也自动创建了新绑定 (#),如原始问题中所述 - 我无法发送消息特定的路由键。
    【解决方案3】:

    Spring Cloud Stream 在内部将消费者端点的路由器键设置为目标名称 (exchange 名称) 本身,或者在静态分区的情况下基于 partition 标头的路由。

    我认为this github 问题可能与您的情况有关。

    【讨论】:

    • 所以您的意思是,使用 Spring 云流,我无法将消费者绑定到具有路由键的特定预定义(非匿名)队列?
    • 我刚刚测试过,我们添加了第二个绑定;我认为这是一个错误 - 如果队列已经存在,我们不应该添加泛型(# 通配符绑定)。作为一种变通方法,您可以使用@RabbitListener 而不是@StreamListener(除非您依赖流侦听器进行转换)。我打开了and issue for this
    【解决方案4】:

    鼓励在 consumer 下使用此属性以使 rabbit 能够从现有队列中消费。请注意,队列名称将仅从组属性中选择,而不是从目标中选择。

    queueNameGroupOnly: 真

    例子:

    cloud:
    stream:
      # rabbit setting: https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
      rabbit:
        bindings:
          input:
            consumer:
              acknowledgeMode: AUTO
              bindingRoutingKey: DECISION_PERSISTENCE_KEY
              declareExchange: false
              bindQueue: false
              queueNameGroupOnly: true
              consumerTagPrefix: dpa-rabbit-consumer
      bindings:
        input:
          binder: rabbit
          group: DECISION_PERSISTENCE_QUEUE
          content-type: application/json
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-03-25
      • 2019-07-17
      • 1970-01-01
      • 2021-12-30
      • 1970-01-01
      • 2016-06-21
      相关资源
      最近更新 更多