【问题标题】:Spring Integration JMS with JTA rollback when message goes to errorChannel当消息进入errorChannel时,带有JTA回滚的Spring集成JMS
【发布时间】:2016-10-28 10:05:46
【问题描述】:

我通过 Atomikos 和 JMS 使用带有 JTA 支持的 Spring Integration,这些 JMS 绑定到入站和出站的不同 Webshpere MQ。 流程如下:

  • JMS 入站通道适配器接收消息
  • 一些转换
  • 输出队列的 JMS 出站通道适配器
  • 发生错误时,errorChannel 会收到消息
  • 异常类型路由器将未处理的错误路由到自定义重新抛出服务,并将已处理错误路由到收件人列表路由器,该路由器将它们发送到 2 个错误队列

我的问题是,即使消息传递到 errorChannel 的下游(在已处理的异常情况和错误队列中),我也希望提交事务。 据我了解,只有在重新抛出异常时才会发生回滚(这就是我重新抛出未处理的异常的原因),但在我的情况下,一旦消息到达 errorChannel,事务就会回滚(在它被路由到其他地方之前)。

我做错了什么?

配置如下。

<jms:inbound-channel-adapter id="jms-in"
                             destination="input-queue"
                             connection-factory="inConnectionFactory"
                             channel="edi-inbound"
                             acknowledge="transacted">
    <poller max-messages-per-poll="${process.jms.inbound.poll.messages-per-poll:1}"
            fixed-rate="${process.jms.inbound.poll.rate-millis:60000}"
            >
        <transactional timeout="${process.tx.timeout-sec:60}"/>
    </poller>
</jms:inbound-channel-adapter>
<channel id="edi-inbound"/>

<chain input-channel="edi-inbound" output-channel="edi-transformation-chain">
    <object-to-string-transformer/>
    <service-activator ref="inbound" method="service"/>
</chain>

<!-- edifact transformation flow -->
<chain input-channel="edi-transformation-chain" output-channel="outbound-message-compose">
    <transformer ref="edi2xml-converter"/>
    <transformer ref="xml-mapper"/>
</chain>




<chain input-channel="outbound-message-compose" output-channel="outbound-channel">
    <service-activator ref="outbound-message-composer" />
</chain>

<channel id="outbound-channel">
    <interceptors>
        <beans:ref bean="outbound-interceptor" />
    </interceptors>
</channel>

<recipient-list-router input-channel="outbound-channel">
    <recipient channel="file-outbound"/>
    <recipient channel="queue-outbound"/>
</recipient-list-router>



<channel id="queue-outbound"/>
<jms:outbound-channel-adapter id="jms-out" destination="output-queue" channel="queue-outbound" connection-factory="outConnectionFactory"/>



<channel id="file-outbound"/>
<file:outbound-channel-adapter id="file-outbound"
                                   directory="${output.directory}"
                                   filename-generator-expression="headers['${application.header.key.messageid}'] + '_' + new java.util.Date().getTime() + '.xml'"
                                   delete-source-files="true" />




<!-- notification outbound flow -->
<channel id="errorChannel">
    <interceptors>
        <wire-tap channel="logger"/>
    </interceptors>
</channel>
<logging-channel-adapter id="logger" level="INFO"/>

<exception-type-router input-channel="errorChannel" default-output-channel="unhandled-error-channel">
    <mapping exception-type="aero.aice.apidcm.integration.exception.HandledException" channel="error-notification-channel" />
</exception-type-router>

<recipient-list-router input-channel="error-notification-channel">
    <recipient channel="queue-outbound-error"/>
    <recipient channel="queue-inbound-error"/>
</recipient-list-router>

<chain input-channel="queue-outbound-error">
    <service-activator ref="outbound-error-composer" />
    <jms:outbound-channel-adapter id="jms-out-error"
                                  destination="error-output-queue"
                                  connection-factory="outConnectionFactory"
                                  session-transacted="true"/>
</chain>

<chain input-channel="queue-inbound-error">
    <service-activator ref="error-notif-composer" />
    <jms:outbound-channel-adapter id="jms-in-error"
                                  destination="error-input-queue"
                                  connection-factory="outConnectionFactory"
                                  session-transacted="true"/>
</chain>


<channel id="unhandled-error-channel" />
<service-activator ref="exception-rethrow" input-channel="unhandled-error-channel"/>

顺便说一句,当错误通道上的 tx 回滚时,两个错误队列在任何情况下都接收到消息(好像出站适配器不会参与事务),而 tx 为正常流(当没有发生错误)完美运行。

【问题讨论】:

    标签: spring spring-integration jta spring-jms


    【解决方案1】:

    没错。

    因为你使用Polling Inbound Channel Adapter。它的逻辑是这样的:

    AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
      ...
                        if (!Poller.this.pollingTask.call()) {
                            break;
                        }
       ...
                    catch (Exception e) {
                        if (e instanceof RuntimeException) {
                            throw (RuntimeException) e;
                        }
                        else {
                            throw new MessageHandlingException(new ErrorMessage(e), e);
                        }
                    }
                }
            });
    

    您的 TX 是 pollingTask 代理的一部分,作为 AOP TransactionInterceptor

    errorChannelthis.taskExecutorErrorHandler 的一部分。 因此,只有当我们从pollingTask 抛出异常时,我们才能到达errorChannel。既然我们在那里有 TX,它当然会被回调。

    我的观点是:Polling Inbound Channel Adapter 中的错误处理过程是在 TX 之外完成的。

    考虑切换到&lt;int-jms:message-driven-channel-adapter&gt;

    【讨论】:

    • 我曾计划在某个时候切换到消息驱动通道适配器,主要用于生产环境,但我希望我可以使用入站通道适配器和轮询器获得相同的结果。你认为有什么方法可以将错误处理过程带入 TX,即使是 poller 版本?
    • 嗯,实际上 JMS 是流式协议,所以总是听目的地真的很自然。当我们需要 JMS 的轮询适配器时,这是罕见的极端情况
    • 因此,或者,我可以配置消息驱动适配器以“缓慢”地使用消息。我的意思是,我可以决定消费率吗?
    • 好吧,你可以玩concurrency选项,直到当前完成后才处理下一条消息,但是,我不明白为什么要降低从JMS读取消息的速度...
    • 在开发环境中,我有一个填充的入站队列,其中包含有限的消息(并且我对其的控制有限)。所以,为了在不清空队列的情况下进行测试,我需要控制消费。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-03
    相关资源
    最近更新 更多