【问题标题】:Query regarding Spring message-driven-channel-adapter关于 Spring 消息驱动通道适配器的查询
【发布时间】:2015-05-01 19:29:20
【问题描述】:

我正在使用 Spring 的消息驱动通道适配器。 我的组件正在使用来自 Tibco 主题的消息并发布到 RabbitMQ 主题

所以消息流如下: Tibco->(订阅者)组件(发布到)-> RabbitMQ

服务激活器如下所示:我们看到有一个输入通道和一个输出通道。 bean storeAndForwardActivator 将具有业务逻辑(在方法 createIssueOfInterestOratorRecord 中)

<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
    ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
    output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />

我还有一个 message=driven-channel-adapter。该适配器将在服务适配器被调用之前被调用。

<int-jms:message-driven-channel-adapter
    id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
    container="oratorIssueOfInterestmessageListenerContainer" />

即特别是容器(如下所示)将保存要使用的主题名称 - 这是 DefaultMessageListenerContainer

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />
</bean>

此设置完美无缺。但是在某些情况下,我的消费者/组件会收到“流氓”消息。即一个空的有效负载或 HashMap 的消息类型(而不是纯 TextMessage) - 当我们得到这个时 - 我观察到的是 - 在 DefaultMessageListener 级别捕获了一个异常(即我没有像我的业务 bean 一样,即 storeAndForwardActivator ),因此我的组件没有发回 ACK - 因为这是一个持久的主题 - 在主题上有一个消息的构建 - 这是不可取的。 有没有办法让我立即确认消息,而不管天气如何,在 DefaultMessageListener 级别捕获异常?

或者我应该在 DefaultMessageListener 中引入错误处理程序吗? 处理这个问题的最佳方法是什么,有什么建议吗?

问候 D

更新:

我尝试将 errorHandler 添加到 org.springframework.jms.listener.DefaultMessageListenerContainer 如下图

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />

    <property name="errorHandler" ref="myErrorHandler"/>
</bean>

myErrorHandler 是如下 shpwn 的 bean

<bean id="myErrorHandler"
    class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />

MyErroHandler 实现 ErrorHandler

 @Service
 public class MyErrorHandler implements ErrorHandler{

private static Log log = LogFactory.getLog(MyErrorHandler.class);

@Override
   public void handleError(Throwable t) {

        if (t instanceof MessageHandlingException) {
            MessageHandlingException exception = (MessageHandlingException) t;
            if (exception != null) {
                org.springframework.messaging.Message<?> message = exception.getFailedMessage();
                Object payloadObject = message.getPayload();
                if (null != payloadObject) {
                    log.info("Payload  is not null, type is: " + payloadObject.getClass());
                }
            }
        } else {
            log.info("Exception is not of type: MessageHandlingException ");
        }
}

}

我注意到异常被捕获(当订阅者使用流氓消息时)。我一直在循环中看到这个日志

    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 

即由于事务未提交 - 来自持久主题的相同消息被一次又一次地消耗。我的目标是在使用消息后将 ACK 发送回代理(无论天气如何,是否捕获到异常)。

我明天会尝试错误通道。

问候 D

【问题讨论】:

    标签: error-handling spring-jms message-listener durable-subscription


    【解决方案1】:

    在消息驱动适配器中添加error-channelErrorMessage 将包含一个具有两个字段的 MessagingException 有效负载; cause(例外)和failedMessage

    如果您使用默认的error-channel="errorChannel",则会记录异常。

    如果您想做更多的事情,您可以配置自己的错误通道并添加一些流程。

    编辑:

    下面还有你的 cmets...

    payload must not be null 不是堆栈跟踪;这是一条消息。

    也就是说,payload must not be null 看起来像是一条 Spring Integration 消息;它可能在消息转换期间被抛出在消息侦听器适配器中,这是在我们到达失败可以转到error-channel 的地步之前;这样的异常会被扔回容器中。

    打开 DEBUG 日志并查找此日志条目:

    logger.debug("converted JMS Message [" + jmsMessage + "] to integration Message payload [" + result + "]");
    

    另外,提供完整的堆栈跟踪。

    编辑#2

    所以,我通过在自定义 MessageConverter 中强制转换后的有效负载为空来重现您的问题。

    DMLC 错误处理程序在事务回滚后由容器调用,因此无法停止回滚。

    我们可以向适配器添加一个选项来以不同的方式处理此类错误,但这需要一些工作。

    与此同时,一种解决方法是编写自定义MessageConverter;类似于this Gist 中的那个。

    然后,您的服务将不得不处理“收到错误消息”负载。

    然后您提供这样的自定义转换器...

    <jms:message-driven-channel-adapter id="jmsIn"
            destination="requestQueue" acknowledge="transacted"
            message-converter="converter"
            channel="jmsInChannel" />
    
    <beans:bean id="converter" class="foo.MyMessageConverter" />
    

    【讨论】:

    • 嗨,Gary,我注意到格式错误消息的异常被捕获在容器部分,即 bean id="oratorIssueOfInterestmessageListenerContainer"。这就是为什么我尝试添加自定义错误处理程序(如上所示)。但似乎异常被“优雅地”捕获但事务未提交,即没有向代理发送确认,并且由于它是一个持久主题 - 一次又一次地消耗相同的消息并且它进入一个循环。处理异常并将 ACK 发送回 Broker 的最佳方法是什么?
    • 你需要显示堆栈跟踪;包含 HashMap 的 ObjectMessage 不应“格式错误”。你不应该使用ErrorHandler 来处理MessageHandlingException;如我所说,此类异常应在error-channel 中处理。
    • 你是对的! HashMap 是一个红鲱鱼。我放在 MessageListenerContainer 下的错误处理程序显示以下堆栈跟踪:有效负载不能为空。所以看起来我得到了一个带有空有效负载的 JMS - 因为我使用以下消息选择器: - 有效负载为空/空- 在 MessageListenerContainer 中抛出异常(并被 errorHandler 捕获) - 我认为这就是正在发生的事情。
    • 我在 message-driven-channel-adapter 下引入了一个 error-channel="errorChannel" 但根本没有调用 error-channel,因为容器本身捕获了异常!即使有一个空的有效负载,我也需要找到一种方法来回复代理。
    • 嗨,Gary,请查看此链接:forum.spring.io/forum/spring-projects/integration/… 我想知道 Tibco 是否也有同样的情况?我看到了类似的行为,即 - 消息没有负载(IBM MQ 标头:内容 = nil)JMS 侦听器抛出一个不会传递到错误通道的异常,而是将事务回滚,将消息留在队列中(立即和无限重试)。 - 消息的有效负载为空。JMS 侦听器或 SI Base Transformer 会抛出一个异常,该异常确实会传递到错误通道。
    猜你喜欢
    • 2014-01-23
    • 1970-01-01
    • 2017-04-29
    • 2021-03-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-04-29
    相关资源
    最近更新 更多