【问题标题】:Set message priority Spring Integration DSL设置消息优先级 Spring Integration DSL
【发布时间】:2023-09-06 01:52:01
【问题描述】:

我正在尝试设置一个将消息发布到 RabbitMQ 的集成工作流。

我对此有 2 个问题: 1. 我的队列 Bean 是否像我希望的那样工作 :) 2. 如何使用集成 DSL 通过 outbound-amqp-adapter 设置消息的优先级?

 @Configuration

public class RabbitConfig {

  @Autowired
  private ConnectionFactory rabbitConnectionFactory;

  @Bean
  TopicExchange worksExchange() {
    return new TopicExchange("work.exchange", true, false);
  }


  @Bean
  Queue queue() {
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-max-priority", 10);
    return new Queue("dms.document.upload.queue", true, false, false, args);
  }

  @Bean
  public RabbitTemplate worksRabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
    template.setExchange("work.exchange");
    template.setRoutingKey("work");
    template.setConnectionFactory(rabbitConnectionFactory);
    return template;
  }


@Configuration
public class WorksOutbound {

    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public IntegrationFlow toOutboundQueueFlow() {
        return IntegrationFlows.from("worksChannel")
                .transform(Transformers.toJson())
                .handle(Amqp.outboundAdapter(rabbitConfig.worksRabbitTemplate()))
                .get();
    }

}

更新 在能够使用适当的“优先级标头”推送消息之后,我可以使用 Rabbit Management UI 根据它们的优先级拉取消息,但我无法使用 spring-amqp 消费者正确拉取它们......

  @Bean
  public SimpleMessageListenerContainer workListenerContainer() {
    SimpleMessageListenerContainer container =
      new SimpleMessageListenerContainer(rabbitConnectionFactory);
    container.setQueues(worksQueue());
    container.setConcurrentConsumers(2);
    container.setDefaultRequeueRejected(false);    
    return container;
  }

【问题讨论】:

    标签: spring-amqp


    【解决方案1】:
    1. 看起来不错。

    2. .handle() 之前,使用.enrichHeaders(...) 标头名称为IntegrationMessageHeaderAccessor.PRIORITY 和一个整数值。

    编辑

    @SpringBootApplication
    public class So49692361Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So49692361Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(SimpleMessageListenerContainer container, ApplicationContext ctx) {
            return args -> {
                Gate gate = ctx.getBean(Gate.class);
                gate.send(new GenericMessage<>("foo", Collections.singletonMap("foo", 1)));
                gate.send(new GenericMessage<>("bar", Collections.singletonMap("foo", 2)));
                container.start();
            };
        }
    
        @Bean
        public static IntegrationFlow flow(AmqpTemplate amqpTemplate) {
            return IntegrationFlows.from(Gate.class)
                .enrichHeaders(h -> h.headerExpression(IntegrationMessageHeaderAccessor.PRIORITY,
                        "headers.foo"))
                    .handle(Amqp.outboundAdapter(amqpTemplate).routingKey("so49692361"))
                .get();
        }
    
        @Bean
        public Queue queue() {
            return new Queue("so49692361", true, false, false, Collections.singletonMap("x-max-priority", 5));
        }
    
        @Bean
        public SimpleMessageListenerContainer container(ConnectionFactory cf) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
            container.setQueues(queue());
            container.setMessageListener(m -> {
                System.out.println(m);
            });
            container.setAutoStartup(false);
            return container;
        }
    
        public interface Gate {
    
            public void send(Message<?> message);
    
        }
    
    }
    

    (Body:'bar' MessageProperties [headers={errorChannel=, foo=2, priority=2}, ...    
    (Body:'foo' MessageProperties [headers={errorChannel=, foo=1, priority=1}, ...
    

    【讨论】:

    • 好的,这将为所有消息全局设置优先级,但我需要能够在每条消息上设置它,具体取决于paylad。在消息发布之前,也许可以使用某种拦截器......
    • 好的,我开始阅读您在 [link] *.com/questions/27314651/….. 的帖子。现在我正在尝试使用 SimpleMessageListenerContainer 中的优先消息(请参阅问题中的更新..)。 . 问候韩国
    • 要回答您的第一条评论,您可以使用标题表达式f.enrichHeaders(h -&gt; h.headerExpression(IntegrationMessageHeaderAccessor.PRIORITY, "payload.foo"))。我看不到容器如何以与管理 UI 不同的顺序获取消息。
    • 我刚刚测试了它,它对我来说很好;查看我的编辑。