【问题标题】:Spring Integration - Kafka Outbound Adapter Acknowledge IssueSpring Integration - Kafka 出站适配器确认问题
【发布时间】:2017-12-14 14:07:31
【问题描述】:

在我发布我的问题之前,我要感谢 Gary 和 Artem 帮助我解决了我的问题,并且因为我能够成功地将消息从 JMS 发布到 Kafka,并且交易到位。

现在,我面临另一个问题并测试当我的 Kafka 宕机时会发生什么。 当 kafka 在前几次重试时关闭时,kafka 出站适配器会抛出异常并将消息返回给 JMS 并一次又一次地重试。

但是,经过几次重试后,即使 kafka 关闭,消息也会从 JMS 中出列,并且出现以下异常:

2017-07-10 23:27:51.117 ERROR 16116 --- [enerContainer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='Test JPMC' to topic test:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

我的集成 xml 是:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/integration/jms"
    xmlns:integration="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
    http://www.springframework.org/schema/integration/kafka 
    http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">

    <jms:message-driven-channel-adapter
        id="helloJMSAdapater" container="requestListenerContainer" 
        channel="helloChannel" extract-payload="true" error-channel="errorChannel"/>


    <integration:recipient-list-router
        input-channel="errorChannel">
        <integration:recipient channel="errorOutputChannel" />
        <integration:recipient channel="rethrowChannel" />
    </integration:recipient-list-router>

    <jms:outbound-channel-adapter id="errorQueueChannelAdapter"
        channel="errorOutputChannel" destination="errorQueue" connection-factory="jmsConnectionfactory"
        delivery-persistent="true" explicit-qos-enabled="true" />


    <int-kafka:outbound-channel-adapter
        id="kafkaOutboundChannelAdapter" kafka-template="kafkaTemplate"
        auto-startup="true" sync="true" channel="inputToKafka" topic="test">
    </int-kafka:outbound-channel-adapter>


</beans>

我不想确认 JMS 消息,除非它们成功发布到 kafka。

是不是因为kafka设置了一些默认参数?

我的 kafka 配置如下:

@Configuration
@Component
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//this.brokerAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // set more properties
        return new DefaultKafkaProducerFactory<>(props);
    }
}

【问题讨论】:

    标签: apache-kafka spring-integration activemq


    【解决方案1】:

    这不是卡夫卡的问题。如果您说您的消息“从 JMS 出队”,请确保将队列上的重新传递策略配置为无限。

    例如 ActiveMQ 的故事在这里:http://activemq.apache.org/redelivery-policy.html

    maximumRedeliveries 6 设置消息在被视为有毒药丸并返回到代理之前将被重新传递的最大次数,以便它可以进入死信队列。 设置为 -1 表示无限次重新投递。

    【讨论】:

    • 感谢 Artem 似乎可以解释错误。我添加了以下内容,现在工作正常。
    • 嗨 Artem,还有一个问题,kafka 出站适配器会在它启动并运行后自动接收消息,还是我需要指定任何特定配置。 :outbound-channel-adapter>
    • 它不依赖于 Kafka 通道适配器。它是 passive 组件,它只对传入的数据做出反应。由于您总是将故障回滚到 JMS,因此相同的消息会一次又一次地到达 Kafka CA。这已经是 JMS 入站通道适配器的责任,以确保相同的消息处理。这就是 TX 一直以来的工作方式。甚至在我们之后也会起作用。
    猜你喜欢
    • 2016-07-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-30
    • 1970-01-01
    • 1970-01-01
    • 2017-04-29
    • 1970-01-01
    相关资源
    最近更新 更多