【发布时间】:2017-12-14 14:07:31
【问题描述】:
在我发布我的问题之前,我要感谢 Gary 和 Artem 帮助我解决了我的问题,并且因为我能够成功地将消息从 JMS 发布到 Kafka,并且交易到位。
现在,我面临另一个问题并测试当我的 Kafka 宕机时会发生什么。 当 kafka 在前几次重试时关闭时,kafka 出站适配器会抛出异常并将消息返回给 JMS 并一次又一次地重试。
但是,经过几次重试后,即使 kafka 关闭,消息也会从 JMS 中出列,并且出现以下异常:
2017-07-10 23:27:51.117 ERROR 16116 --- [enerContainer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='Test JPMC' to topic test:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
我的集成 xml 是:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/integration/jms"
xmlns:integration="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/integration/kafka
http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
<jms:message-driven-channel-adapter
id="helloJMSAdapater" container="requestListenerContainer"
channel="helloChannel" extract-payload="true" error-channel="errorChannel"/>
<integration:recipient-list-router
input-channel="errorChannel">
<integration:recipient channel="errorOutputChannel" />
<integration:recipient channel="rethrowChannel" />
</integration:recipient-list-router>
<jms:outbound-channel-adapter id="errorQueueChannelAdapter"
channel="errorOutputChannel" destination="errorQueue" connection-factory="jmsConnectionfactory"
delivery-persistent="true" explicit-qos-enabled="true" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-template="kafkaTemplate"
auto-startup="true" sync="true" channel="inputToKafka" topic="test">
</int-kafka:outbound-channel-adapter>
</beans>
我不想确认 JMS 消息,除非它们成功发布到 kafka。
是不是因为kafka设置了一些默认参数?
我的 kafka 配置如下:
@Configuration
@Component
public class KafkaConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//this.brokerAddress);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
}
【问题讨论】:
标签: apache-kafka spring-integration activemq