【发布时间】:2019-10-04 17:02:00
【问题描述】:
如果异步发送到 Kafka,我需要捕获异常。 Kafka producer Api 自带一个函数 send(ProducerRecord 记录,Callback 回调)。但是当我针对以下两种情况进行测试时:
- Kafka Broker 宕机
- 未预先创建主题 回调没有被调用。相反,我在代码中收到发送失败的警告(如下所示)。
问题:
那么回调是否只针对特定异常调用?
Kafka 客户端何时尝试在异步发送时连接到 Kafka 代理:在每批发送时还是定期发送?
注意:我还使用 25 秒的 linger.ms 设置来批量发送我的记录。
public class ProducerDemo {
static KafkaProducer<String, String> producer;
public static void main(String[] args) throws IOException {
final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");
producer = new KafkaProducer<String, String>(properties);
String topic = "first_topic";
for (int i = 0; i < 5; i++) {
String value = "hello world " + Integer.toString(i);
String key = "id_" + Integer.toString(i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//execute everytime a record is successfully sent or exception is thrown
if(e == null){
// No Exception
}else{
//Exception Handling
}
}
});
}
producer.close();
}
【问题讨论】:
标签: apache-kafka kafka-producer-api