【发布时间】:2023-09-06 01:52:01
【问题描述】:
我正在尝试设置一个将消息发布到 RabbitMQ 的集成工作流。
我对此有 2 个问题: 1. 我的队列 Bean 是否像我希望的那样工作 :) 2. 如何使用集成 DSL 通过 outbound-amqp-adapter 设置消息的优先级?
@Configuration
public class RabbitConfig {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Bean
TopicExchange worksExchange() {
return new TopicExchange("work.exchange", true, false);
}
@Bean
Queue queue() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
return new Queue("dms.document.upload.queue", true, false, false, args);
}
@Bean
public RabbitTemplate worksRabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
template.setExchange("work.exchange");
template.setRoutingKey("work");
template.setConnectionFactory(rabbitConnectionFactory);
return template;
}
@Configuration
public class WorksOutbound {
@Autowired
private RabbitConfig rabbitConfig;
@Bean
public IntegrationFlow toOutboundQueueFlow() {
return IntegrationFlows.from("worksChannel")
.transform(Transformers.toJson())
.handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate()))
.get();
}
}
更新 在能够使用适当的“优先级标头”推送消息之后,我可以使用 Rabbit Management UI 根据它们的优先级拉取消息,但我无法使用 spring-amqp 消费者正确拉取它们......
@Bean
public SimpleMessageListenerContainer workListenerContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitConnectionFactory);
container.setQueues(worksQueue());
container.setConcurrentConsumers(2);
container.setDefaultRequeueRejected(false);
return container;
}
【问题讨论】:
标签: spring-amqp