【问题标题】:Pushing messages in kafka to separate partitions by a key将 kafka 中的消息通过 key 推送到不同的分区
【发布时间】:2019-01-01 20:25:12
【问题描述】:

我有一个包含 3 个分区的 Kafka 主题(test-topic),以及一组包含只能采用 3 种类型值的键的消息,我希望这些消息分开根据它们的值进行分区。

from kafka import KafkaProducer
from kafka.partitioner import DefaultPartitioner

messages = [{"partition_key":"k1", "x":1},
            {"partition_key":"k2", "x":2},
            {"partition_key":"k3", "x":3},
            {"partition_key":"k1", "x":4},
            {"partition_key":"k2", "x":5}]

partitioner = DefaultPartitioner()
all_partitions = list(range(100))
available = all_partitions
dataPartitioner = partitioner(b'partition_key', all_partitions, available)

producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda v: json.dumps(v).encode('utf-8'), partitioner = dataPartitioner)

for m in messages:
  producer.send('test-topic', m)
producer.flush()

在上面的代码中,我希望 partition_key 值相同的消息进入同一个分区。

【问题讨论】:

    标签: apache-kafka kafka-producer-api kafka-python


    【解决方案1】:

    您需要编写Partitioner 接口的自定义实现,并在初始化时将该类提供给KafkaProducer

    例如,

     private static Properties createProducerConfig(String brokers) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokers);
        //more properties
        props.put("partitioner.class","com.app.KafkaUserCustomPatitioner");
        return props;
        }
    

    【讨论】:

      猜你喜欢
      • 2020-05-15
      • 1970-01-01
      • 2019-02-04
      • 2018-10-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-25
      相关资源
      最近更新 更多