【问题标题】:spring integration dsl configure idempotent receiver to identify duplicatesspring integration dsl配置幂等接收器识别重复
【发布时间】:2018-05-17 22:42:31
【问题描述】:

我在使用队列中的消息时使用 DUPS_OK_ACKNOWLEDGE 模式,我需要检测重复项并忽略它们。

.from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
                        .destination(sourceQueue)
                        .configureListenerContainer(spec -> {
                            spec.sessionTransacted(false);
                            spec.sessionAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
                        }))
                .transform(orderTransformer)
                .handle(orderService, "save")
                .get();

我有一个幂等接收器建议。

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    IdempotentReceiverInterceptor idempotentReceiverInterceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
            (String) m.getHeaders().get("JMSMessageId")));
    idempotentReceiverInterceptor.setDiscardChannelName("ignoreDuplicates");
    idempotentReceiverInterceptor.setThrowExceptionOnRejection(false);
    return idempotentReceiverInterceptor;
}

我被两件事困住了

  1. 如何在 Jms.messageDrivenChannelAdapter?
  2. 如果我需要将元数据存储在 oracle/mysql 中,该表与任何示例链接的外观如何

【问题讨论】:

    标签: spring-integration spring-integration-dsl


    【解决方案1】:

    Idempotent Receiver 绝对是 consumer 的问题,而不是 producer 的问题,即Jms.messageDrivenChannelAdapter()。当然是在 Spring Integration 方面。

    如果您不想将重复项传递给下游,则需要在Jms.messageDrivenChannelAdapter() 之后在消费者上配置这样的Advice。在您的情况下,它是.transform(orderTransformer)。因此,代码可能如下所示:

    .transform(orderTransformer, e -> e.advice(idempotentReceiverInterceptor()))
    

    oracle/mysql MetadataStore 在这里 - JdbcMetadataStore5.0: https://docs.spring.io/spring-integration/docs/5.0.5.RELEASE/reference/html/jdbc.html#jdbc-metadata-store

    【讨论】:

    • 拥有.handle(idempotentReceiverInterceptor()).transform(orderTransformer)和你的建议.transform(orderTransformer, e -> e.advice(idempotentReceiverInterceptor()))有什么区别
    • ??? IdempotentReceiverInterceptor 不是 MessageHandler。这是一个 AOP Advice。因此它只能应用于其他一些消息处理组件。是什么让你有不同的想法? docs.spring.io/spring-integration/docs/5.0.5.RELEASE/reference/…
    • 我现在明白了,让我想到的原因是.transform(orderTransformer, e -> e.advice(idempotentReceiverInterceptor()))给出了编译错误,我花了一段时间才把它修复到.transform(orderTransformer, "methodName", e -> e.advice(idempotentReceiverInterceptor()))
    • 是的...您的代码不清楚orderTransformer 是什么类型。很高兴看到您找到了解决方案。
    • 感谢我也能够为我的 ms-sql 设置元数据存储