【问题标题】:Kafka Producer publishing message to single partitionKafka Producer 将消息发布到单个分区
【发布时间】:2020-01-27 15:15:54
【问题描述】:

我是 Kafka 新手,正在阅读可用的官方文档。

在我的本地系统上,我与 zookeeper 一起启动了一个 kafka 实例。 Zookeper 和 kafka 服务器都在默认端口上运行。

我创建了一个主题“测试”,复制因子为 1,因为我只有一个 kafka 实例启动并运行。

我还创建了两个分区。

我有两个消费者在同一个消费者组中订阅了这个队列。

现在我已经在 Windows 机器上使用命令提示符启动了消费者。

当我从命令提示符启动生产者并将消息发布到主题时,一切正常。 Kafka 使用轮询将消息推送到两个分区,每个消费者交替接收消息,因为他们每个人都在监听单独的分区。

但是当我使用 java kafka-client jar 创建生产者时,即使我对消息使用不同的密钥,生产者也会将所有消息推送到同一个分区,因为所有消息都在同一个消费者上接收。

分区也不是静态的,每次我运行生产者时它都会不断变化。

我已经尝试了与从命令提示符启动的生产者相同的场景,其配置与我使用 java 代码提供给 kafka-client 生产者的配置完全相同。命令提示符生成器似乎工作正常,但代码生成器将所有消息推送到同一分区。

我已经尝试更改某些消息的密钥,希望代理将其发送到不同的分区,因为它在文档中提到代理使用消息的密钥路由消息。

public class KafkaProducerParallel {


public static void main(String[] args) throws InterruptedException, 
ExecutionException {

    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, "parallelism- 
 producer");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
 StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);


    Producer<String, Long> parallelProducer = new KafkaProducer<> 
(properties);

    for(long i=0;i<100;i++) {

        ProducerRecord<String, Long> producerRecord;

        if(i<50) {
            producerRecord = new ProducerRecord<String, 
 Long>("second-topic", "Amoeba", i);
        }else {
            producerRecord = new ProducerRecord<String, 
 Long>("second-topic", "Bacteria", i);
        }

        RecordMetadata recordMetadata = 
  parallelProducer.send(producerRecord).get();

        System.out.printf("Sent record : with key %s and value 
 %d to partition %s", producerRecord.key(), producerRecord.value(), 
 recordMetadata.partition());
        System.out.println();
    }

    parallelProducer.close();


}

}

根据文档,kafka 代理通过使用密钥(生成密钥哈希)决定将特定消息放入哪个分区。 我在一段时间后更改了记录的键,但消息仍然每次都发送到同一个分区。

代码的示例控制台输出:

  Sent record : with key Amoeba and value 0 to partition 1
  Sent record : with key Amoeba and value 1 to partition 1
  Sent record : with key Amoeba and value 2 to partition 1
  Sent record : with key Amoeba and value 3 to partition 1
  Sent record : with key Amoeba and value 4 to partition 1
  Sent record : with key Amoeba and value 5 to partition 1
  Sent record : with key Amoeba and value 6 to partition 1
  Sent record : with key Amoeba and value 7 to partition 1
  Sent record : with key Amoeba and value 8 to partition 1
  Sent record : with key Amoeba and value 9 to partition 1
  Sent record : with key Amoeba and value 10 to partition 1
  Sent record : with key Amoeba and value 11 to partition 1
  Sent record : with key Amoeba and value 12 to partition 1
  Sent record : with key Amoeba and value 13 to partition 1

 Sent record : with key Bacteria and value 87 to partition 1
 Sent record : with key Bacteria and value 88 to partition 1
 Sent record : with key Bacteria and value 89 to partition 1
 Sent record : with key Bacteria and value 90 to partition 1
 Sent record : with key Bacteria and value 91 to partition 1
 Sent record : with key Bacteria and value 92 to partition 1
 Sent record : with key Bacteria and value 93 to partition 1
 Sent record : with key Bacteria and value 94 to partition 1
 Sent record : with key Bacteria and value 95 to partition 1
 Sent record : with key Bacteria and value 96 to partition 1
 Sent record : with key Bacteria and value 97 to partition 1
 Sent record : with key Bacteria and value 98 to partition 1
 Sent record : with key Bacteria and value 99 to partition 1

【问题讨论】:

  • 你也可以分享错误信息
  • @harkeshkumar 控制台没有错误。我只是好奇为什么总是将记录发送到同一个分区
  • @harkeshkumar 我已经用代码的示例控制台输出更新了问题
  • 你只有这两个键吗?他们去同一个分区的机会是 50%。尝试 50 个不同的键,它们应该随机分散。
  • 如果您没有任何密钥,它将在分区之间随机分布。如果你有key,它将使用key来分配分区。

标签: java apache-kafka kafka-producer-api


【解决方案1】:

一切都按预期进行。

在您的特定情况下,KafkaProducer 使用的 Partitioner(用于确定分区)为两个键计算相同的分区:AmoebaBacteria。默认情况下,KafkaProducer 使用org.apache.kafka.clients.producer.internals.DefaultPartitioner

建议:更改key或增加分区数。

注意:生产者决定将消息放到哪个分区,而不是代理。

【讨论】:

    【解决方案2】:

    Producer&lt;String, String&gt; producer = new KafkaProducer&lt;String, String&gt;更改代码 到:

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>
    

    默认情况下,接口实现将数据放入同一个分区。所以使用KafkaProducer而不是简单的Producer

    【讨论】:

    • 这是不正确的,两者都是 KafkaProducer 实例,将类型分配为具体类 vs 接口不会改变行为。
    【解决方案3】:

    从 Apache Kafka 2.4 及更高版本开始,默认分区策略已更改为具有空键的记录,因此粘性分区是默认行为。

    之前的循环策略意味着具有空键的记录将在分区之间拆分,新的粘性分区策略将记录发送到同一分区,直到分区的批处理“完成”(这由 batch.size 或 linger 定义.ms)

    查看这篇文章了解更多信息: Improvements with Sticky Partitioner

    【讨论】:

    猜你喜欢
    • 2018-10-27
    • 2016-10-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-26
    • 1970-01-01
    相关资源
    最近更新 更多