【发布时间】:2019-01-01 20:25:12
【问题描述】:
我有一个包含 3 个分区的 Kafka 主题(test-topic),以及一组包含只能采用 3 种类型值的键的消息,我希望这些消息分开根据它们的值进行分区。
from kafka import KafkaProducer
from kafka.partitioner import DefaultPartitioner
messages = [{"partition_key":"k1", "x":1},
{"partition_key":"k2", "x":2},
{"partition_key":"k3", "x":3},
{"partition_key":"k1", "x":4},
{"partition_key":"k2", "x":5}]
partitioner = DefaultPartitioner()
all_partitions = list(range(100))
available = all_partitions
dataPartitioner = partitioner(b'partition_key', all_partitions, available)
producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda v: json.dumps(v).encode('utf-8'), partitioner = dataPartitioner)
for m in messages:
producer.send('test-topic', m)
producer.flush()
在上面的代码中,我希望 partition_key 值相同的消息进入同一个分区。
【问题讨论】:
标签: apache-kafka kafka-producer-api kafka-python