【问题标题】:How to configure consumer-level transactional redelivery with Camel and IBM MQ如何使用 Camel 和 IBM MQ 配置消费者级别的事务重新交付
【发布时间】:2021-01-31 21:30:02
【问题描述】:

我正在尝试使用连接到 IBM MQ 的 Apache Camel 在 Java Spring Boot 中完成一个事务性 JMS 客户端。此外,当消息处理失败时,客户端需要应用指数退避重新传递行为。原因:来自 MQ 的消息需要处理并转发到可能停机维护数小时的外部系统。使用事务来保证至少一次处理保证对我来说似乎是合适的解决方案。

我已经研究了这个主题很多小时,但未能找到解决方案。我将从我目前拥有的开始:

  @Bean
  UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter ()
      throws IOException {
    MQConnectionFactory factory = new MQConnectionFactory();
    factory.setCCDTURL(tabFilePath);

    UserCredentialsConnectionFactoryAdapter adapter =
        new UserCredentialsConnectionFactoryAdapter();
    adapter.setTargetConnectionFactory(factory);
    adapter.setUsername(userName);
    bentechConnectionFactoryAdapter.setPassword(password);

    return adapter;
  }

  @Bean
  PlatformTransactionManager jmsTransactionManager(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter) {
    JmsTransactionManager txMgr = new JmsTransactionManager(uccConnectionFactoryAdapter);
    return txMgr;
  }

  @Bean()
  CamelContextConfiguration contextConfiguration(@Autowired UserCredentialsConnectionFactoryAdapter uccConnectionFactoryAdapter,
      @Qualifier("jmsTransactionManager") @Autowired PlatformTransactionManager txMgr) {
    return new CamelContextConfiguration() {
      @Override
      public void beforeApplicationStart(CamelContext context) {
        JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(uccConnectionFactoryAdapter, txMgr);
        // required for consumer-level redelivery after rollback
        jmsComponent.setCacheLevelName("CACHE_CONSUMER");
        jmsComponent.setTransacted(true);
        jmsComponent.getConfiguration().setConcurrentConsumers(1);

        context.addComponent("jms", jmsComponent);
      }

      @Override
      public void afterApplicationStart(CamelContext camelContext) {
        // Do nothing
      }
    };
  }

// in a route builder
...
from("jms:topic:INPUT_TOPIC?clientId=" + CLIENT_ID + "&subscriptionDurable=true&durableSubscriptionName="+ SUBSCRIPTION_NAME)
    .transacted()
    .("direct:processMessage");
...

我能够通过集成测试验证事务行为。如果在消息处理期间发生未处理的异常,事务将回滚并重试。问题是,它会立即每秒重试几次,这可能会给 IBM MQ 管理器和外部系统造成很大的负载。

对于 ActiveMQ,重新交付策略很容易做到,网上有很多例子。 ActiveMQConnectionFactory 有一个 setRedeliveryPolicy 方法,意思是 ActiveMQ 客户端库内置了重新传递逻辑。据我所知,这与 Camel 的 Transactional Client EIP 的文档一致,其中指出:

事务模式下的重新交付不是由 Camel 处理,而是由支持系统(事务管理器)处理。在这种情况下,您应该求助于后备系统如何配置重新传递。

我绝对无法弄清楚如何为 IBM MQ 实现相同的目标。 IBM 的MQConnectionFactory 不支持重新交付策略。事实上,在 MQ 知识中心搜索 redeliverypolicy 会准确地找到...击鼓... 0 次点击。我什至浏览了一下 MQConnectionFactory 的实现,也没有发现任何东西。

我研究的另一个支持系统是JmsTransactionManager。搜索“jmstransactionmanager 重新交付策略”或“jmstransactionmanager 指数退避”也没有发现任何有用的信息。有一些关于 TransactionTemplateAbstractMessageListenerContainer 的讨论,但 1) 我没有看到与重新交付政策有任何联系,2) 我无法弄清楚这些政策如何与 Camel 和 JMS 交互。

Sooo,有人知道如何使用 Apache Camel 和 IBM MQ 实施指数退避重新交付策略吗?

结束说明:Camel 支持errorHandleronException 上的重新交付政策与事务/连接支持系统中的重新交付政策相同。这些处理程序使用处于任何状态的“Exchange”对象在故障点重试,而不会从路由开始回滚和重新处理消息。该事务在整个重试期间保持活动状态,并且仅当errorHandleronException 放弃时才会发生回滚。对于可能持续数小时的重试,这不是我想要的。

【问题讨论】:

  • 这个答案有帮助吗? stackoverflow.com/questions/63292922/…
  • @JoshMc 不确定我将如何在 CamelContext 的上下文中访问 DMLC 并与之交互。然而,那个帖子确实给了我一个想法。我现在正在研究 RouterPolicy 概念和 ThrottlingInflightRoutePolicy 实现。也许我可以使用它来实施我自己的退避政策。

标签: apache-camel jms ibm-mq spring-jms mq


【解决方案1】:

看起来@JoshMc 为我指明了正确的方向。我设法实现了RoutePolicy,它会随着延迟的增加而延迟重新交付。我已经运行了几个小时的测试会话和数千次相同消息的重新传递,以查看是否存在内存泄漏、MQ 连接耗尽等问题。我没有观察到任何问题。与 MQ 管理器有两个稳定的 TCP 连接,Java 进程的内存使用量在很近的范围内移动。

import java.util.Timer;
import java.util.TimerTask;
import javax.jms.Session;
import lombok.extern.log4j.Log4j2;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Route;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.support.RoutePolicySupport;

@Log4j2
public class ExponentialBackoffPolicy extends RoutePolicySupport implements CamelContextAware {
  final static String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
  private CamelContext camelContext;

  @Override
  public void setCamelContext(CamelContext camelContext) {
    this.camelContext = camelContext;
  }

  @Override
  public CamelContext getCamelContext() {
    return this.camelContext;
  }

  @Override
  public void onExchangeDone(Route route, Exchange exchange) {
    try {
      // ideally we would check if the exchange is transacted but onExchangeDone is called after the
      // transaction is already rolled back, and the transaction context has already been removed.
      if (exchange.getException() == null)
      {
        log.debug("No exception occurred, skipping route suspension.");
        return;
      }

      int deliveryCount = getRetryCount(exchange);
      int redeliveryDelay = getRedeliveryDelay(deliveryCount);
      log.info("Suspending route {} for {}ms after exception. Current delivery count {}.",
          route.getId(), redeliveryDelay, deliveryCount);

      super.suspendRoute(route);
      scheduleWakeup(route, redeliveryDelay);
    } catch (Exception ex) {
      // only log exception and let Camel continue as of this policy didn't exist.
      log.error("Exception while suspending route", ex);
    }
  }

  void scheduleWakeup(Route route, int redeliveryDelay) {
    Timer timer = new Timer();
    timer.schedule(
        new TimerTask() {
          @Override
          public void run() {
            log.info("Resuming route {} after redelivery delay of {}ms.", route.getId(), redeliveryDelay);
            try {
              resumeRoute(route);
            } catch (Exception ex) {
              // only log exception and let Camel continue as of this policy didn't exist.
              log.error("Exception while resuming route", ex);
            }
            timer.cancel();
          }
        },
        redeliveryDelay);
  }

  int getRetryCount(Exchange exchange) {
    Message msg = exchange.getIn();
    return (int) msg.getHeader(JMSX_DELIVERY_COUNT, 1);
  }

  int getRedeliveryDelay(int deliveryCount) {
    // very crude backoff strategy for now, will need to refine later
    if (deliveryCount < 10) return 1000;
    if (deliveryCount < 20) return 5000;
    if (deliveryCount < 30) return 20000;
    return 60000;
  }
}

这就是它在路由定义中的使用方式:

    from(mqConnectionString)
        .routePolicy(new ExponentialBackoffPolicy())
        .transacted()
        ...

    // and if you want to distinguish between retriable and non-retriable situations, apply the following two exception handlers
    onException(NonRetriableProcessingException.class)
        .handled(true)
        .log(LoggingLevel.WARN, "Non-retriable exception occurred, discard message.");

    onException(Exception.class)
        .handled(false)
        .log(LoggingLevel.WARN, "Retriable exception occurred, retry message.");

需要注意的一点是,JMSXDeliveryCount 标头来自 MQ 管理器,并由此计算重新传递延迟。当您在消息永久失败时使用ExponentialBackoff 策略重新启动应用程序时,重新启动时它将立即尝试重新处理该消息,但如果再次失败,则应用与重新传递总数相对应的延迟,而不是重新开始最初的短暂延迟。

【讨论】:

  • 很好的问答。如果您还想防止有害消息永远延迟,您可以将其与在队列管理器 QLOCAL 定义上配置 Back Out 队列结合使用。您将BOQNAME 设置为要将消息移动到的另一个队列的名称,并将BOTHRESH 设置为在将消息移动到该队列之前允许的尝试次数。
  • 感谢您的建议。我读过这些。但是,我可能会构建一个依赖于某些异常类型的客户端解决方案,并通过将这些消息转发到 DLQ 并将交换标记为成功来立即“退出”这些消息。这样做的原因是,下游系统可能会在几个小时甚至整个周末都不可用,因此重新交付尝试的总持续时间几乎是不可预测的。我认为用无法处理的消息识别这些场景会更有效,即使我们仍然存在一定的风险,我们会错过一两个场景。 :-(
  • 听起来是一个很好的计划,我希望更多的应用程序开发人员能在 MQ 消费应用程序上投入这么多精力:)
  • @Christoph,这真是一个很有帮助的话题。有一个相关的问题,如果出现任何问题,我们可以如何限制重试次数,然后需要将原始消息写入文件(我认为可以在需要时使用 ProducerTemplate 来完成)和提交/确认消息,这样就不会再次重播?
  • @getSantsoh 不幸的是,我无法准确说明如何做到这一点,但希望这些是一些有用的想法。 Camel 还具有死信通道功能。 camel.apache.org/components/3.4.x/eips/dead-letter-channel.html 但通读它的描述,似乎它确实在客户端重试,这是我不想要的。您可能会编写一个 onException 处理程序,该处理程序查看 JMS deliveryCount 属性并将交换定向到某个数量后的某个错误输出路由。请参阅我在上面的帖子中添加的代码示例。
猜你喜欢
  • 2014-10-23
  • 1970-01-01
  • 2016-02-21
  • 1970-01-01
  • 2016-10-10
  • 1970-01-01
  • 2016-05-05
  • 1970-01-01
  • 2021-06-07
相关资源
最近更新 更多