【发布时间】:2018-06-20 04:03:50
【问题描述】:
我们正在尝试为我们的生产者实现交易。我们的用例我们从 MQ 接收消息并发布到 kafka。当出现故障时,我们需要回滚发布到 kafka 的消息,并且不向 MQ 发送确认。
当我们使用事务时,我们看到消息在 kafka 主题中重复。
@Bean("producerConfig")
public Properties producerConfig() {
LOGGER.info("Creating Dev Producer Configs");
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
configs.put(ProducerConfig.ACKS_CONFIG, "all");
configs.put(ProducerConfig.RETRIES_CONFIG, 1);
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return configs;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(new HashMap<String, Object>((Map) producerConfig));
producerFactory.setTransactionIdPrefix("spring-kafka-transaction");
return producerFactory;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(topic);
return kafkaTemplate;
}
@Bean
KafkaTransactionManager<String,String> kafkaTransactionManager(){
KafkaTransactionManager<String, String> transactionManager = new KafkaTransactionManager<>(producerFactory());
return transactionManager;
}
监听方法
@Component
public class WMQListener implements MessageListener {
KafkaTemplate<String, String> kafkaTemplate;
@Override
@Transactional
public void onMessage(Message message) {
String onHandXmlStr = null;
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
onHandXmlStr = textMessage.getText();
}
LOGGER.debug("Message Received from WMQ :: " + onHandXmlStr);
Msg msg = JaxbUtil.convertStringToMsg(onHandXmlStr);
List<String> onHandList = DCMUtil.convertMsgToList(msg);
ListenableFuture send = kafkaTemplate.sendDefault(onHandList.get(0));
send.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onSuccess(Object result) {
System.out.println(result);
}
});
message.acknowledge();
}
【问题讨论】:
-
您的
@Configuration课程中有@EnableTransactionManagement吗?您是否将您的消费者ISOLATION_LEVEL_CONFIG更改为READ_COMMITTED?如果两者都是,DEBUG 日志(org.apache.kafka、org.springframework.transaction)能否帮助您了解发生了什么? -
是的,我有
@EnableTransactionManagement,但我目前正在测试生产者应用程序并且尚未启动消费者,因为我观察到单个消息的偏移量增加了 2,所以我很担心。今天,我通过在同一主题上启动控制台使用者进行了测试,但没有明确指定READ COMMITTED,但我只收到消息而不是重复消息。但是,我想知道为什么当我使用@Transaction和KafkaTransactionManager而没有它时偏移量增加了两倍。
标签: apache-kafka spring-transactions spring-kafka