【问题标题】:Kafka Producer : Handle Exception in Async Send with CallbackKafka Producer:使用回调处理异步发送中的异常
【发布时间】:2019-10-04 17:02:00
【问题描述】:

如果异步发送到 Kafka,我需要捕获异常。 Kafka producer Api 自带一个函数 send(ProducerRecord 记录,Callback 回调)。但是当我针对以下两种情况进行测试时:

  • Kafka Broker 宕机
  • 未预先创建主题 回调没有被调用。相反,我在代码中收到发送失败的警告(如下所示)。

问题:

  • 那么回调是否只针对特定异常调用?

  • Kafka 客户端何时尝试在异步发送时连接到 Kafka 代理:在每批发送时还是定期发送?

Kafka Warning Image

注意:我还使用 25 秒的 linger.ms 设置来批量发送我的记录。


public class ProducerDemo {

    static KafkaProducer<String, String> producer;

    public static void main(String[] args) throws IOException {

         final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");

        producer = new KafkaProducer<String, String>(properties);
        String topic = "first_topic";

        for (int i = 0; i < 5; i++) {
            String value = "hello world " + Integer.toString(i);
            String key = "id_" + Integer.toString(i);

            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);

              producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        //execute everytime a record is successfully sent or exception is thrown
                        if(e == null){
                           // No Exception
                        }else{
                            //Exception Handling
                        }
                    }
                });
        }
        producer.close();
    }

【问题讨论】:

    标签: apache-kafka kafka-producer-api


    【解决方案1】:

    作为KafkaProducer 提供的弹性机制,您将收到关于不存在主题的警告。如果再等一会儿(默认应该是 60 秒),回调最终会被调用: 这是我的sn-p:

    因此,当出现问题并且异步发送不成功时,它会最终失败并导致未来失败或/和回调异常。 如果您没有以事务方式运行它,则仍然可能意味着批处理中的某些消息已找到到达代理的方式,而其他消息则没有。 如果您需要对发送到 Kafka 的每条消息的上游系统(如 http 摄取接口等)进行阻塞式确认,这肯定会是一个问题。做到这一点的唯一方法是使用未来的get 阻止每条消息,如documentation 中所述:

    总的来说,我注意到很多与 KafkaProducer 交付语义和保证相关的问题。它肯定可以更好地记录下来。

    还有一件事,既然你提到了linger.ms

    请注意,及时到达的记录通常会 即使与 linger.ms=0 一起批处理,所以在重负载下批处理会 无论 linger 配置如何,都会发生

    【讨论】:

      【解决方案2】:

      那么回调是否只针对特定异常调用?

      是的,它就是这样工作的。来自文档(2.5.0):

           * Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that
           * will be invoked when the request is complete.
      

      注意重要的部分:当请求完成时,这意味着生产者必须已经接受了记录并将 ProduceRequest 发送给 Kafka Broker。如果不深入挖掘内部,这意味着必须存在代理元数据并且必须存在分区。

      当涉及到正式规范时,您需要好好看看send() 的Javadoc,可能还需要看看KafkaProducer 对doSend 方法的实现。在那里你会看到在提交调用中可以抛出多个异常(而不是返回未来并调用回调),例如:

      • 如果代理元数据在给定的超时时间内不可用,
      • 如果数据无法序列化,
      • 如果序列化表单太大等

      【讨论】:

        【解决方案3】:

        对于第一个问题,这是答案。 根据 apache kafka 文档,您可以在实现 Callback 接口时使用 onCompletion 方法捕获以下异常

        https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/Callback.html

        对于第二个问题,以下属性的组合控制何时发送记录,据我了解,同步或异步调用是相同的。

        linger.ms max.block.ms

        https://kafka.apache.org/documentation/#linger.ms

        【讨论】:

          猜你喜欢
          • 2019-07-13
          • 2017-05-06
          • 1970-01-01
          • 2017-12-18
          • 2018-08-22
          • 1970-01-01
          • 2011-09-11
          • 1970-01-01
          • 2019-08-29
          相关资源
          最近更新 更多