【发布时间】:2020-12-26 21:36:32
【问题描述】:
对我来说,kafka 事务生产者的行为似乎与普通生产者一样,消息在主题上可见,因为每条消息都调用了 send。也许我错过了一些基本的东西。我希望消息仅在调用生产者提交方法后才会出现在主题中。在我下面的代码中,produce.commitTransactions() 被注释掉了,但我仍然收到主题中的消息。感谢您的任何指点。
public static void main(String[] args) {
try {
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer-1");
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id-1"); // set transaction id
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(producerConfig);
producer.initTransactions(); //initiate transactions
try {
producer.beginTransaction(); //begin transactions
for (Integer i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<String, String>("t_test", i.toString(), "value_" + i));
}
// producer.commitTransaction(); //commit
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
} catch (Exception e) {
System.out.println(e.toString());
}
}
【问题讨论】:
标签: apache-kafka transactions kafka-consumer-api kafka-producer-api