【问题标题】:How to pause/start/stop Kafka Producer / Kafka Template如何暂停/启动/停止 Kafka Producer / Kafka 模板
【发布时间】:2019-02-11 20:28:10
【问题描述】:
我正在使用带有 kafka 集成的 spring boot 应用程序,我想实现一个端点来停止和启动 kafka 发布消息。
消息由其他端点以异步方式触发。
bean KafkaTemplate<String, String> 或 ProducerFactory<String, String> producerFactory() 没有任何停止和暂停操作。
我的目标是能够模拟连接失败并确保将这些消息存储在我现有的备用机制中。
有什么想法吗?
【问题讨论】:
标签:
java
spring-boot
apache-kafka
kafka-producer-api
spring-kafka
【解决方案1】:
KafkaTemplate 没有这些回调,因为它是一个被动 组件,只有在我们调用它时才能执行这些操作。
对于连接失败的模拟,我建议您实现一个自定义ProducerFactory,生成一个KafkaProducer,并模拟或覆盖Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);。在那里,在ProducerFactory 中,您可以实现适当的生命周期回调并对上述send() 实现中的状态做出反应。
org.apache.kafka.clients.producer.MockProducer 可能有一些东西供您重复使用或借用。例如查看其close() 或fenceProducer()。