【问题标题】:Kafka: How to achieve Round Robin Partition in KafkaKafka:如何在 Kafka 中实现 Round Robin Partition
【发布时间】:2019-09-03 18:54:09
【问题描述】:

我是卡夫卡的新手。我的要求是,我有两个分区,例如 Partition-0 和 Partition-1,我有一个值列表,其中还包含 KEY 值。我想根据我的密钥存储数据,比如 key-1 将进入 Partition-0,key-2 将进入 Partition-1。使用旧 API 可以实现我们需要实现 Partition 接口的方法,但是我如何使用新 API 来做到这一点。谢谢

【问题讨论】:

    标签: apache-kafka kafka-producer-api


    【解决方案1】:

    如果您想要循环行为,请在写入 Producer 时不要传递密钥,DefaultPartitioner 将为您完成这项工作。您不需要编写自定义实现。来自 javadocs:

    /**
     * The default partitioning strategy:
     * <ul>
     * <li>If a partition is specified in the record, use it
     * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
     * <li>If no partition or key is present choose a partition in a round-robin fashion
     */
    

    【讨论】:

      【解决方案2】:

      使用新的生产者,您还可以实现Partitioner 接口(@98​​7654321@)来实现循环分发。

      您可以使用DefaultPartitioner作为参考-https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java

      【讨论】:

        【解决方案3】:

        您可以通过覆盖 kafka 生产者的default partitioner 以循环方式生产到 kafka。

        伪实现

        class RRPartitioner():
              def __init__():
                    # Using topic metadata get total number of partitions
                    self.total_partitions = client[topic].get_number_partitions()
                    self.part_offset = 0
        
              def partitioner(self, key, msg):
                  if self.part_offset > self.total_partitions:
                      self.part_offset = 0
                      return self.part_offset
                  else:
                      self.part_offset += 1
                      return self.part_offset
        

        上面的实现是纯循环,如果您希望消息根据键排序并具有循环,您将不得不在自定义分区器中做更多的事情。

        【讨论】:

        • 这是最简单的解决方案,但是如果在运行时添加分区就行不通,这是完全有效的情况
        • True..您将不得不重新启动生产者或定期轮询元数据更改。大多数现有的关键生产者都会遇到同样的问题 IIRC。
        【解决方案4】:

        从 Kafka 2.4.0 开始,您可以选择“始终”循环。

        https://issues.apache.org/jira/browse/KAFKA-3333

        【讨论】:

        • @AlpcanYıldız 不确定您所说的 Kafka 流属性是什么意思,但它是以生产者为中心的配置。您可以阅读有关 KafkaStreams 的信息,它说它在下面使用普通的生产者来编写处理过的流数据 - github.com/apache/kafka/blob/2.3/streams/src/main/java/org/…
        • 我其实想问,在kafka流属性中我可以使用默认分区器吗?我了解 kafka 流使用默认的普通分区器。例如,我可以说 props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, streamingConfig.getStandbyReplicas())。因为它是一个 StreamsConfig。但是我能说 props.put(Pro.NUM_STANDBY_REPLICAS_CONFIG, streamingConfig.getStandbyReplicas());但是我可以用这个吗? props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class)。
        • 你能检查我的问题吗?因为 Matthias 说我不能使用这个配置,但实际上我可以 (stackoverflow.com/questions/59645127/…)。如果我将 RoundRobinPartitioner 与 2.4 客户端和 2.2 版本的 kafka 一起使用,生产者使用循环设置,但我不能使用所有内部主题的分区。它会在 50 个分区上分发我的消息,例如 ~30 或 ~40 个分区
        • @AlpcanYıldız Matthias 的意思是——你不能直接使用它。我建议你检查 DefaultStreamPartitioner 类,看看它是如何在内部使用DefaultPartitioner 的实例的。您的自定义实现将简单地实现 StreamPartitioner 并在内部使用 RoundRobinPartitioner 来完成它。提醒一句 - 谨慎订购。此外,他在第二段中确实提到了这一点。底线 - 你不能直接通过 Producer.properties 使用它
        • 如果我不能像这样使用 with,你知道我怎么能做到这一点,所以我所有的 kafka-dsl 拓扑在所有内部主题中都使用循环分区器吗?
        猜你喜欢
        • 2017-04-22
        • 2019-11-02
        • 2016-12-10
        • 2019-04-28
        • 1970-01-01
        • 2021-06-13
        • 2020-08-20
        • 2020-03-14
        • 1970-01-01
        相关资源
        最近更新 更多