【问题标题】:Apache Camel JMS Component: bridgeErrorHandler does not work as documentedApache Camel JMS 组件:bridgeErrorHandler 不像记录的那样工作
【发布时间】:2021-06-21 23:10:55
【问题描述】:

问题

您好,我尝试桥接从 JMS 消费者到我的骆驼路线的错误处理程序,这是一个 DLC,使用标志 bridgeErrorHandler=trueJMSException 发生在我消费的消息从未由 DLC ErrorHandler 处理,而是由 DefaultSpringErrorHandler 处理,因此只会被记录和忽略。

在我的 DLC 中,我已经配置了当异常发生时应该执行额外的步骤,现在可悲的是忽略了这些步骤。 也许你能告诉我我做错了什么?

以下是详细信息:

项目设置

我正在使用:

  • spring-boot:2.1.0.RELEASE
  • 阿帕奇骆驼:2.23.3
  • camel-jms-starter:2.23.3

路线:

@Component
public class BridgeErrorHandlerTestQuellRoute extends RouteBuilder{

    private static final String ROUTE_ALIAS = "BridgeErrorHandlerTestQuellRoute";

    public static final String ENDPOINT_QUELLE = "{{BridgeErrorHandlerTestQuelle}}";

    @Override
    public void configure() {
        errorHandler(deadLetterChannel(DLCRoute.ENDPOINT_DLC));

        // @formatter:off
        from(ENDPOINT_QUELLE).routeId(ROUTE_ALIAS)
            .removeHeaders("*", "JMS*") // Entfernt alle nicht notwendigen JMS-Header
            .log(INFO, log, "Start Route: ${routeId}")
            .to("mock:mymock")
            .log(INFO, log, "Ende Route: ${routeId}")
        // @formatter:on
    }
}

配置:

端点本身在application.properties 中定义(不过我不得不更改主题名称):

topic.props=subscriptionShared=true&\
            transacted=true&\
            recoveryInterval=-1&\
            connectionFactory=#connectionFactory&\
            bridgeErrorHandler=true

BridgeErrorHandlerTestQuelle=jms:topic:IBMMQSERVER/AT/ALL/MYTOPIC?durableSubscriptionName=BridgeErrorHandlerTest.bridgeerrorhandlertest&${topic.props}

connectionfactory bean

connectionfactory 用于 IBM MQ:

    @Bean
    public ConnectionFactory connectionFactory() {
        try {
            MQConnectionFactory factory = new MQConnectionFactory();
            factory.setHostName(properties.getHost());
            factory.setPort(properties.getPort());
            factory.setQueueManager(properties.getQueueManager());
            factory.setChannel(properties.getChannel());
            factory.setStringProperty(WMQConstants.USERID, properties.getUser());
            factory.setStringProperty(WMQConstants.PASSWORD, properties.getPassword());
            factory.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
            return factory;
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

异常

异常发生在: com.ibm.msg.client.wmq.common.internal.WMQUtils#computeTextFromBytes(byte[], int, int, com.ibm.mq.jmqi.system.JmqiCodepage)DetailedJMSException

WARN  EndpointMessageListener        : Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - Failed to extract body due to: com.ibm.msg.client.jms.DetailedJMSException: JMSCMQ1049: Mit dem Zeichensatz '1208(UTF-8) Unmappable Action: REPORT Unmappable Replacement: 63' kann die Zeichenfolge '[B@22b4f934' gar nicht oder nur teilweise konvertiert werden.
Es wurde versucht, Zeichenfolgedaten mit einem Zeichensatz zu versenden bzw. zu empfangen, der den Inhalt der Zeichenfolge nicht umsetzen kann.
Codieren Sie eine Nachricht nur mit einem Zeichensatz, von dem bekannt ist, dass er für die zu übertragenden Zeichenfolgedaten geeignet ist.. Message: 
[removed due to sensitive details]
org.apache.camel.RuntimeCamelException: Failed to extract body due to: com.ibm.msg.client.jms.DetailedJMSException: JMSCMQ1049: Mit dem Zeichensatz '1208(UTF-8) Unmappable Action: REPORT Unmappable Replacement: 63' kann die Zeichenfolge '[B@22b4f934' gar nicht oder nur teilweise konvertiert werden.
Es wurde versucht, Zeichenfolgedaten mit einem Zeichensatz zu versenden bzw. zu empfangen, der den Inhalt der Zeichenfolge nicht umsetzen kann.
Codieren Sie eine Nachricht nur mit einem Zeichensatz, von dem bekannt ist, dass er für die zu übertragenden Zeichenfolgedaten geeignet ist.. Message: 
  [removed due to sensitive details]
    at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:176)
    at org.apache.camel.component.jms.JmsMessage.createBody(JmsMessage.java:227)
    at org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:54)
    at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:132)
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:113)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.ibm.msg.client.jms.DetailedJMSException: JMSCMQ1049: Mit dem Zeichensatz '1208(UTF-8) Unmappable Action: REPORT Unmappable Replacement: 63' kann die Zeichenfolge '[B@22b4f934' gar nicht oder nur teilweise konvertiert werden.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:319)
    at com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:226)
    at com.ibm.msg.client.wmq.common.internal.WMQUtils.computeTextFromBytes(WMQUtils.java:382)
    at com.ibm.msg.client.wmq.common.internal.WMQUtils.computeTextFromByteBuffer(WMQUtils.java:421)
    at com.ibm.msg.client.wmq.common.internal.messages.WMQTextMessage.getText(WMQTextMessage.java:240)
    at com.ibm.msg.client.jms.internal.JmsTextMessageImpl.getText(JmsTextMessageImpl.java:205)
    at com.ibm.jms.JMSTextMessage.getText(JMSTextMessage.java:124)
    at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:161)
    ... 22 common frames omitted
Caused by: java.nio.charset.MalformedInputException: Input length = 1
    at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
    at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:816)
    at com.ibm.mq.jmqi.system.JmqiCodepage.bytesToString(JmqiCodepage.java:745)
    at com.ibm.msg.client.wmq.common.internal.WMQUtils.computeTextFromBytes(WMQUtils.java:375)
    ... 27 common frames omitted

【问题讨论】:

    标签: java spring-boot apache-camel


    【解决方案1】:

    好的,所以我们正在为 Camel 3 改进这一点,您可以在其中打开 eagerLoadingOfProperties(组件级别或每个端点),然后将检测到此类 JMS 有效负载错误,并使用异常消息设置消息正文;您可以配置的,以及在 Exchange 上导致的异常。这允许 Camel 的路由错误处理程序将其检测为异常并路由到死信通道。

    查看 JIRA 票证:https://issues.apache.org/jira/browse/CAMEL-14083

    【讨论】:

      【解决方案2】:

      之后

      • 大量调试和
      • 尝试路由配置的不同“星座”(全局onException 和特定路由onException-definitions)

      我发现使用 camel-jms-starter:2.23.xcamel-jms-starter:2.24.x 问题无法解决 以编程方式,因为我想保留 transacted=true 标志。我认为 camel-jms-starter 中的 只是一个错误,发生的异常没有由我配置的 Route-ErrorHandler 处理,但我无法更改。

      所以我目前的解决方案是在链接的MQ-Broker中建立一个Poison-Message机制。这样我就可以摆脱无限循环,并且可以将消息路由到另一个队列。

      编辑
      尽管我不太喜欢它,但有一个解决方案可以内置到路线中。如果我将自定义处理器添加到在正文转换期间捕获异常的路由,我配置的 errorHandler 将被触发,并且因为它是 DLC 错误处理程序,消息被成功使用并且我没有无限循环。注意:使用 convertBodyTo 会导致与问题中提到的相同的意外行为,因为它使用相同的 API。 处理器工作是因为在此之前交换的主体还没有被触及。

      代码如下:
      更改后的路线:

          @Override
          public void configure() {
              errorHandler(deadLetterChannel(DLCRoute.ENDPOINT_DLC));
      
              // @formatter:off
              from(ENDPOINT_QUELLE).routeId(ROUTE_ALIAS)
                  .removeHeaders("*", "JMS*")
                  .process(myCustomConverter)
                  .log(INFO, log, "Start Route: ${routeId}")
                  .to("mock:mymock")
                  .log(INFO, log, "Ende Route: ${routeId}")
              // @formatter:on
          } 
      

      这是myCustomConverter bean(带有虚拟实现):

      @Component
      public class MyCustomConverter implements Processor {
      
          private static final Logger LOG = LoggerFactory.getLogger(MyCustomConverter.class);
          @Override
          public void process(Exchange exchange) throws Exception {
              try {
                  exchange.getIn().getBody(String.class);
              } catch( Exception ex) {
                  LOG.error("Logging for the error");
                  exchange.getIn().setBody(""); // TODO: Do something which makes sense
              }
          }
      }
      
      

      我不喜欢这种变通方法的原因是,在未来的版本中,框架可能会很好地处理相同的情况,我必须对代码进行更改才能删除这种变通方法。所以对我来说,泊松消息配置是要走的路。

      【讨论】:

      • 您也可以尝试使用 MapJmsMessage=false 这样 Camel 不会急于提取 JMS 消息体,这会导致早期异常
      【解决方案3】:

      将 body 转换从 jms 消费者端点转移到路由流也对我有帮助。

      在 jms 消费者中设置 mapJmsMessage=false 并添加

      .setBody().method(SimpleMessageConverter.class, "fromMessage")
      

      进入路由流程工作正常。

      我的意思是org.springframework.jms.support.converter.SimpleMessageConverter

      【讨论】:

        猜你喜欢
        • 2018-10-01
        • 1970-01-01
        • 2018-06-27
        • 2016-05-25
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多