【发布时间】:2021-04-01 14:26:49
【问题描述】:
我正在尝试构建集成流,这将防止消息在传递到 AMQP 代理 (rabbitMQ) 期间丢失。 在代理停止的情况下,我看到了一些令我意想不到的行为:
- 失败的消息正在保存到消息存储中,但不会保存很长时间。此流程不会等待代理可用,它会从消息存储中提取消息,即使代理仍处于停止状态
- 如果成功重新启动 rabbitmq,消息存储中的记录(如果它们仍然存在)不会被传递到队列。
请帮助我进行调查。代码示例:
@Bean
public MessageChannel messageStoreBackedChannel() {
return new QueueChannel(
new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
);
}
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlows
.from("messageStoreBackedChannel")
.channel("amqpMessageChannel")
.get();
}
@Bean
public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlows
.from("amqpMessageChannel")
.handle(message -> System.out.println(message.getPayload()))
.get();
}
@Bean
public MessageChannel amqpMessageChannel() {
return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
}
@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return jdbcChannelMessageStore;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
【问题讨论】:
标签: spring spring-integration amqp spring-integration-dsl