【问题标题】:Consuming from RabbitMQ / MQTT with Spring Cloud Stream使用 Spring Cloud Stream 从 RabbitMQ / MQTT 消费
【发布时间】:2021-03-25 12:43:41
【问题描述】:
您如何使用通过 MQTT 发送到 RabbitMQ 的 Spring Cloud Stream(即使用 AMQP)消息消费?
使用 Rabbit 上的 MQTT,所有消息都位于名为“amq.topic”的交换中。
在消费者端,使用 Spring Cloud Stream,为每个目的地创建一个队列,并使用目的地名称进行类型为“topic”的交换;创建的队列绑定到交换器。
现在,
-
我无法手动将我的队列绑定到“amq.topic”,因为它是独占的:
无法获得对 vhost '/' 中锁定队列 '...' 的独占访问权限。它可能最初是在另一个连接上声明的,或者独占属性值与原始声明的值不匹配。
-
我不能直接从交换“amq.topic”中收听,因为,嗯,你必须听一个队列...
-
我创建了一个绑定到传递消息的“amq.topic”的“tmp”队列,但我不能将它用作目标,因为 RMQ 将创建一个名为“tmp.SOME_CLIENT_ID”的新队列,绑定到一个名为“ tmp”,这与我的“tmp”队列无关。
欢迎任何想法!
【问题讨论】:
标签:
rabbitmq
amqp
spring-cloud-stream
【解决方案1】:
见Using Existing Queues/Exchanges。
默认情况下,绑定器将自动提供一个主题交换,其名称源自目标绑定属性<prefix><destination> 的值。如果未提供,则目标默认为绑定名称。绑定消费者时,将自动配置名称为<prefix><destination>.<group> 的队列(如果指定了组绑定属性),或者在没有组时使用匿名自动删除队列。对于非分区绑定,队列将使用“match-all”通配符路由键 (#) 绑定到交换器,对于分区绑定,则使用 <destination>-<instanceIndex>。默认情况下,前缀为空字符串。如果使用 requiredGroups 指定了输出绑定,将为每个组提供一个队列/绑定。
有许多特定于兔子的绑定属性允许您修改此默认行为。
如果您希望使用现有的交换/队列,则可以完全禁用自动配置,假设交换名为 myExchange,队列名为 myQueue:
spring.cloud.stream.bindings.<binding name>.destination=myExhange
spring.cloud.stream.bindings.<binding name>.group=myQueue
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true
如果您希望活页夹提供队列/交换,但您想使用此处讨论的默认值以外的其他方式来执行此操作,请使用以下属性。有关详细信息,请参阅上面的属性文档。
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>
spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'