【问题标题】:Kafka Producer API - onCompletionKafka Producer API - onCompletion
【发布时间】:2019-06-13 03:32:17
【问题描述】:

尝试了解 Java 生产者 API。 onCompletion 是什么意思?帮助我理解相同的。

参考: https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null) {
                          e.printStackTrace();
                       } else {
                          System.out.println("The offset of the record we just sent is: " + metadata.offset());
                       }
                   }
               });

【问题讨论】:

    标签: apache-kafka kafka-producer-api


    【解决方案1】:

    在方法中

    producer.send(ProducerRecord<K,V> record, new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {...}
      });
    

    一旦确认发送,就会调用回调。回调在后台 I/O 线程中执行,所以它应该很快(不要阻塞它)

    默认情况下,发送是异步的,一旦记录存储在等待发送的记录缓冲区中,此方法将立即返回。这允许并行发送许多记录,而不会阻塞等待每个记录之后的响应。

    Send 返回 RecordMetadata,它指定记录发送到的分区、分配给它的偏移量和时间戳。

    这是一种异步方法,而您可以使用同步方式来做同样的事情:

    producer.send(record).get();

    【讨论】:

      猜你喜欢
      • 2021-11-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-05-19
      • 2021-11-25
      • 1970-01-01
      • 2019-02-23
      相关资源
      最近更新 更多