【问题标题】:Kafka - Producer AcknowledgementKafka - 生产者致谢
【发布时间】:2021-07-18 19:58:38
【问题描述】:

我在一个视频教程中看到,Kafka Broker 在生产者发布消息时支持 3 种类型的确认。

0 - 一劳永逸
1 - 领导确认
2 - 所有经纪人的确认

我正在使用 Kafka 的 Java API 来发布消息。这是必须使用特定于每个代理的 server.properties 为每个代理设置的东西,还是必须由生产者设置的东西?如果必须由生产者设置,请说明如何使用 Java API 设置。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class KafkaProducerApp {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        try{
            for(int i=0;i<150;i++) {
                RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("replicated_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
                System.out.println(" Offset = " + ack.offset());
                System.out.println(" Partition = " + ack.partition());
            }
        } catch (Exception ex){
            ex.printStackTrace();
        } finally {
            kafkaProducer.close();
        }



    }

}

【问题讨论】:

  • 其实我以为它支持一个连续的值范围:-1 == "all" == leader and all in-sync-replicas, 0 == fire-and-forget, 1 == just领导者,2 ==领导者和一个副本,3 ==领导者和两个副本,等等?
  • 我想 acks >1 从 v0.9 开始被删除:cwiki.apache.org/confluence/display/KAFKA/…

标签: java apache-kafka


【解决方案1】:

它是一个生产者属性,其设置类似于您在代码中拥有的其他属性:

properties.put("acks","all");

可以在here找到所有可配置生产者属性的列表。

您可能还想查看与此生产者配置相关的代理(或主题)propertymin.insync.replicas

【讨论】:

  • 非常感谢@vahid
  • @vahid 这是否意味着当我们关闭生产者时,它实际上会等到我们得到所有消息的确认?或者我们可以在下一次或下一次发送之前检查代码中的某处。
  • 是的,来自KafkaProducer的JavaDocThis method blocks until all previously sent requests complete.
【解决方案2】:

我认为你应该了解 acks 属性实际上做了什么,并看看幕后。如果没问题,您将看到此属性由生产者配置。

例如,您不能丢失任何消息,例如审核日志。以下代码我们将如何启动我们的生产者配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("acks", "all"); //We are using acks=all in order to get the strongest guarantee we can.
props.put("retries", "3");
props.put("max.in.flight.requests.per.connection", "5");

这是一个小而强大的变化,对消息是否到达有重大影响。

这张图片来自Kafka In Action这本书,更清楚地代表acks属性:

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-01-23
    • 2019-10-31
    • 2016-01-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多