【问题标题】:Message store for persistence delivering to AMQP broker in Spring Integration在 Spring Integration 中用于持久性传递到 AMQP 代理的消息存储
【发布时间】:2021-04-01 14:26:49
【问题描述】:

我正在尝试构建集成流,这将防止消息在传递到 AMQP 代理 (rabbitMQ) 期间丢失。 在代理停止的情况下,我看到了一些令我意想不到的行为:

  1. 失败的消息正在保存到消息存储中,但不会保存很长时间。此流程不会等待代理可用,它会从消息存储中提取消息,即使代理仍处于停止状态
  2. 如果成功重新启动 rabbitmq,消息存储中的记录(如果它们仍然存在)不会被传递到队列。

请帮助我进行调查。代码示例:

 @Bean
public MessageChannel messageStoreBackedChannel() {
    return new QueueChannel(
            new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
    );
}

 @Bean
public IntegrationFlow someFlow() {
    return IntegrationFlows
            .from("messageStoreBackedChannel")
            .channel("amqpMessageChannel")
            .get();
}

@Bean
public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
    return IntegrationFlows
            .from("amqpMessageChannel")
            .handle(message -> System.out.println(message.getPayload()))
            .get();
}


@Bean
public MessageChannel amqpMessageChannel() {
    return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
}

@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
    var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
    jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());

    return jdbcChannelMessageStore;
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
}

【问题讨论】:

    标签: spring spring-integration amqp spring-integration-dsl


    【解决方案1】:

    考虑将.from("messageStoreBackedChannel").channel("amqpMessageChannel") 之间的端点配置为transactional()

    类似这样的:

    .from("messageStoreBackedChannel")
    .bridge(e -> e.poller(p -> p.fixedDelay(10).transactional()))
    .channel("amqpMessageChannel")
    

    因此,每当传递到amqpMessageChannel 失败时,事务将回滚,失败的消息将返回到存储区,直到下一次轮询。

    当然,当您在连接到 RabbitMQ 时遇到错误时,您可以停止 bridge 端点。但是你怎么能确定那个连接又回来了呢?..

    【讨论】:

    • .bridge(e -> e.transactional()) - 没有改变我的情况的行为。第一次不成功的重试(org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'amqpMessageChannel')从消息存储中删除一条记录。在从存储中删除记录之前,我们是否有机会进行多次此类重试?
    • 哦!等待。试试这个:.bridge(e -> e.poller(p -> p.fixedDelay(10).transactional()))。轮询器必须是事务性的
    • 是的,它解决了我的问题。非常感谢,它节省了我的时间!
    • 我还有一个关于这个话题的问题,拜托。 spring 集成是否有任何工具来配置消息存储中消息的持久性(例如,它会在一段时间后或在尝试重新发送一些次数后被删除)
    • 由于并非所有持久存储都提供自动生存时间,因此我们在框架中没有这样的功能可以依赖。但是,我们可以考虑添加类似于MessageGroupStoreReaper 的内容,但在组中的每条消息
    猜你喜欢
    • 1970-01-01
    • 2012-09-30
    • 2011-09-01
    • 1970-01-01
    • 2016-01-07
    • 2016-06-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多