【问题标题】:Spring Integration - JMS to Kafka message transfer - End to End TransactionSpring Integration - JMS 到 Kafka 消息传输 - 端到端事务
【发布时间】:2017-12-12 02:31:39
【问题描述】:

我正在使用以下代码使用来自 JMS ActiveMQ 的消息:

<jms:message-driven-channel-adapter
        id="helloJMSAdapater" destination="helloJMSQueue" connection-factory="jmsConnectionfactory"
        channel="helloChannel" extract-payload="true" />
<integration:channel id="helloChannel" />

我的要求是从这里消费并将其发布到 Kafka 出站适配器。使用以下配置:

<int-kafka:outbound-channel-adapter
            id="kafkaOutboundChannelAdapter"
            kafka-producer-context-ref="kafkaProducerContext"
            channel="inputToKafka">
</int-kafka:outbound-channel-adapter>

这是我想要实现的目标:

  1. 我的队列是一个持久主题,除非成功发布到 Kafka,否则不想确认记录。简而言之,我希望有一个从 jms 消费消息到将其发布到 Kafka 的事务行为。

  2. 我注意到我的消息立即出队,如果处理遇到一些异常,我无法重新处理它。我不希望这种情况发生。

  3. 另外,当 kafka 遇到一些问题时,我希望它返回到某个方法,以便我可以保留失败消息,并且如前所述不想确认它。

我真的很难让它发挥作用。有人可以帮帮我吗?

【问题讨论】:

    标签: spring apache-kafka spring-integration activemq


    【解决方案1】:

    你真的可以在&lt;jms:message-driven-channel-adapter&gt; 上拥有transaction-manager 来启动TX。

    &lt;int-kafka:outbound-channel-adapter&gt; 抛出异常时,它会导致 TX 被回调,因此消息将被重新排队。

    如果您对持续存在的错误感兴趣,&lt;jms:message-driven-channel-adapter&gt; 上有一个 error-channel 选项,但您仍然需要重新抛出异常以让 TX 回调。

    要使所有这些工作,您应该确保从乞讨到结束只有一个线程。流中没有 &lt;queue&gt;executor 频道。

    也不清楚你为什么仍然使用这么旧的 Apache Kafka...

    【讨论】:

    • 谢谢阿特姆。我会试一试并恢复。关于 Apache Kafka,我参考了 Spring 资源。有没有办法从 Kafka 捕获异常并将错误消息持久保存到某个商店。 kafka 适配器是否提供诸如错误通道之类的东西。
    • 你还需要sync=true在(新的2.0)kafka适配器上;您可以向 JMS 入站适配器添加错误通道;消息有效负载是 MessagingException,具有 causefailedMessage 属性。
    • 不,但是可以通过ExpressionEvaluatingRequestHandlerAdvice: docs.spring.io/spring-integration/reference/html/…
    猜你喜欢
    • 2016-02-10
    • 2017-02-28
    • 1970-01-01
    • 2016-06-14
    • 1970-01-01
    • 2021-06-04
    • 2020-04-30
    • 2018-01-20
    • 2020-01-25
    相关资源
    最近更新 更多