【问题标题】:Uneven Distribution of messages in Kafka PartitionsKafka 分区中消息分布不均
【发布时间】:2018-11-26 08:02:51
【问题描述】:

我有一个主题有 10 个分区,1 个消费者组有 4 个消费者,工作人员大小为 3。

我可以看到分区中的消息分布不均匀,一个分区有这么多数据而另一个是空闲的。

我怎样才能让我的生产者将负载平均分配到所有分区中,以便所有分区都被正确利用?

【问题讨论】:

  • 我需要澄清一些事情。您使用的是自定义分区策略还是默认分区策略?怎么,你知道消息分布不均吗?
  • @IndraneelBende 当我描述我的主题时,它显示了延迟,通过它我可以确认某些分区的延迟超过 1lac,而有些分区的延迟为 0,这意味着在划分。不确定策略,但这是我可以在代码中看到的:this.partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner");
  • 如果您使用默认分区程序,则消息将在不同分区之间以循环方式生成。你是如何计算这个延迟的?
  • Lag=LOG END OFFSET - CURRENT OFFSET 是的,这就是 kakfa 文档所说的,但不明白为什么一个分区过载而另一个分区空闲。

标签: apache-kafka kafka-producer-api


【解决方案1】:

根据DefaultPartitioner类本身的JavaDoc注释,默认的分区策略是:

  • 如果记录中指定了分区,则使用它。
  • 如果未指定分区但存在键,则根据键的哈希选择分区。
  • 如果不存在分区或键,则以循环方式选择分区。

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java

因此,这里有两个可能导致分布不均的原因,具体取决于您是否在生成消息时指定了密钥:

【讨论】:

  • “您多次指定同一个键”是什么意思?您能否详细说明多次指定相同键的原因以及如何避免这种情况?
  • 这是基于对答案中提到的DefaultPartitioner分区策略中的第二点的解释(以及实践经验)。由于默认情况下,分区是通过对键进行哈希处理来选择的,因此为消息指定相同的键值(这是允许的)会导致具有相同键的所有消息都放在同一个分区上。
【解决方案2】:

您可以为生产者分配一个分区号,而不是使用默认的分区器类,以便消息直接发送到指定的分区,

 ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, partitionNumber,key, value);

【讨论】:

    【解决方案3】:

    似乎您的问题是消息的消耗不均,而不是对 Kafka 主题的消息产生不均。换句话说,您的读取线程数量与您拥有的分区数量不匹配(虽然它们不需要匹配 1:1,但每个消费者线程要读取的分区数量相同)。

    See 简短说明了解更多详情。

    【讨论】:

      【解决方案4】:

      您可以使用生产者记录的关键参数。这是一件事,对于特定键,数据现在总是进入同一个分区,我不知道你的生产者记录的结构,但正如你所说你有 10 个分区,那么你可以简单地使用 n%10 作为你的生产者记录键。 其中 n 是 0 到 9 现在您的记录 0 密钥将为 0,然后 kafka 将生成一个哈希密钥并将其放入某个分区,例如分区 0,对于记录 1,它将是一个,然后它将进入第一个分区等等。 这样,您将能够在您的生产者记录上应用循环,您的密钥将独立于记录中的字段,因此您可以将变量 n 和密钥作为 n%10。

      或者您可以在生产者记录中指定分区。因此,您要么使用生产者记录的键或分区字段。

      【讨论】:

        【解决方案5】:

        如果您已经从记录中定义了分区器,假设在 Kafka 中键是字符串,值是学生 Pojo。

        在学生 Pojo 中,假设基于学生国家字段,我想进入特定分区。假设一个主题有 10 个分区,例如,在值中,“印度”是一个国家,基于“印度”我们得到了 5 号分区。

        每当国家是“印度”时,Kafka 将分配 5 号分区,并且该记录总是转到 5 号分区(如果分区没有改变)。

        假设在您的管道中有很多记录即将到来并且有一个国家“印度”,所有这些记录都将转到第 5 号分区,您会看到 Kafka 分区中的分布不均匀。

        【讨论】:

          【解决方案6】:

          在我的例子中,我使用了默认分区器,但一个分区中的记录仍然比其他分区多得多。问题是我意外地有许多具有相同键的记录。检查你的钥匙!

          【讨论】:

            【解决方案7】:

            由于我无法使用 Faust 解决此问题,因此我使用的方法是自己实现“循环”分发。

            我遍历我的记录以生成并执行例如:

            for index, message in enumerate(messages):
                topic.send(message, partition=index % num_partitions)
            

            即将我的索引绑定到我拥有的分区范围内。

            仍然可能存在不平衡 - 考虑您反复运行此命令,但您的记录数少于 num_partitions - 那么您的第一个分区将继续获得主要的消息份额。您可以通过添加随机偏移量来避免此问题:

            import random
            initial_partition = random.randrange(0, num_partitions)
            for index, message in enumerate(messages):
                topic.send(message, partition=(initial_partition + index) % num_partitions)
            

            【讨论】:

              猜你喜欢
              • 2023-02-01
              • 1970-01-01
              • 1970-01-01
              • 2018-11-21
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              相关资源
              最近更新 更多