【问题标题】:Rabbitmq Delayed messaging is not working with spring cloud streamRabbitmq 延迟消息传递不适用于 Spring Cloud Stream
【发布时间】:2019-08-14 03:06:36
【问题描述】:

我正在尝试在 Spring Cloud Stream 中实现延迟消息(不使用 rabbitmq 插件),但它不起作用

我使用 spring-boot 实现了它,它运行良好。下面是我在 spring-boot 中做的示例代码。

Delayed message in RabbitMQ

我正在尝试在 spring-cloud-stream 中做同样的事情,但没有帮助。以下是属性。

输出通道 - 生产者

spring.cloud.stream.bindings.output.destination=temp-channel
spring.cloud.stream.bindings.output.group=temp-channel-group

输入通道 - 消费者

spring.cloud.stream.bindings.input.destination=temp-channel
spring.cloud.stream.bindings.input.group = temp-channel-group

spring.cloud.stream.bindings.input.consumer.exchange-type=direct
spring.cloud.stream.bindings.input.consumer.bind-queue=true
spring.cloud.stream.bindings.input.consumer.binding-routing- 
key=foo.bar.key
spring.cloud.stream.bindings.input.consumer.required-groups=final- 
channel-group-1
spring.cloud.stream.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.bindings.input.consumer.dlq-dead-letter- 
exchange=final-channel-1
spring.cloud.stream.bindings.input.consumer.dlq-dead-letter-queue=final- 
channel-group-1
spring.cloud.stream.bindings.input.consumer.dlq-dead-routing- 
key=foo.bar.key

当使用spring-boot实现时,我看到消息在临时队列中停留了指定的时间,然后移动到最终队列,我想用spring cloud stream实现同样的效果。任何意见将不胜感激。

【问题讨论】:

  • 那么这个 prop 发生了什么,final-channel-1 Exchange 和 final-channel-group-1 Queue 已经创建并绑定了? x-dead-letter-exchangex-dead-letter-routing-keyx-message-ttl 是否配置在 Queue 的 arg 中?

标签: spring-boot rabbitmq spring-cloud-stream


【解决方案1】:

你不应该从临时通道消费。

重点是你生产一个带有 TTL 的队列,TTL 过期并将消息路由到 DLQ;然后你从 DLQ 消费。

消费者没有required-groups,生产者有。

这是你需要的:

制作人:

spring.cloud.stream.bindings.output.destination=temp-exchange
spring.cloud.stream.bindings.output.producer.required-groups=delayed-group
spring.cloud.stream.rabbit.bindings.output.producer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.output.producer.ttl=5000
spring.cloud.stream.rabbit.bindings.output.producer.dead-letter-exchange=final-dest
spring.cloud.stream.rabbit.bindings.output.producer.dead-letter-queue-name=final-dest.delayed-group

消费者:

spring.cloud.stream.bindings.input.destination=final-dest
spring.cloud.stream.bindings.input.group=delayed-group

这将创建这些队列:

temp-exchange.delayed-group

    x-dead-letter-exchange: final-dest
    x-dead-letter-routing-key:  temp-exchange.delayed-group
    x-message-ttl:  5000
    durable:    true

final-dest.delayed-group

【讨论】:

  • 感谢 Gary 提供详细信息。我确实尝试了您的建议,但它没有按预期工作。它只是创建“队列 final-dest.anonymous.2m9ib1fIS82rTSAIcKLegg” x-queue-master-locator: client-local Exclusive: true auto-delete: true
  • 对于没有组的消费者来说,这是一个匿名队列。您必须添加一个消费者组才能获得正确的命名 - 请参阅我的答案。带 TTL 的临时队列由生产者端创建;通过所需的组属性。关注letter-queue-name=final-dest.delayed-group。那是队列命名约定<destination>.<group>
  • 嗨,Russell,我正在尝试输出、输入声明,但没有成功。今天我了解了最新的api Source,Sink并尝试了它,它起作用了。非常感谢。
猜你喜欢
  • 1970-01-01
  • 2014-09-27
  • 1970-01-01
  • 2021-05-07
  • 1970-01-01
  • 2018-01-24
  • 2022-12-14
  • 1970-01-01
  • 2011-05-25
相关资源
最近更新 更多