【问题标题】:TypeError: partitions must be TopicPartition namedtuplesTypeError: partitions must be TopicPartition namedtuples
【发布时间】:2021-10-25 21:24:43
【问题描述】:

我想使用kafka-python中的KafkaConsumer来消费一个主题中的前N条消息:

from kafka import KafkaConsumer as kc
import json

bootstrap_servers = ['xx.xxx.xx.xxx:9092']
topic_name = 'my_topic_name'

consumer = kc(topic_name, group_id='group1', bootstrap_servers=bootstrap_servers,
              auto_offset_reset='earliest', auto_commit_interval_ms=1000,
              enable_auto_commit=True,
              value_deserializer=lambda x: json.loads(x.decode('utf-8')))
count = 0
consumer.seek_to_beginning((topic_name,0))
kjson = []
for msg in consumer:
    if count < 10:
        count = count + 1
        kjson.append(msg.value)
    else:
        print(json.dumps(kjson, indent=4))
        break

consumer.seek_to_beginning((topic_name,0)) 这一行给了我上述错误。 documentation 指定:

seek_to_beginning(*partitions)[source]
Seek to the oldest available offset for partitions.

Parameters: *partitions – Optionally provide specific TopicPartitions, otherwise default to all assigned partitions.
Raises: AssertionError – If any partition is not currently assigned, or if no partitions are assigned.

本主题共有 32 个分区(索引从 031)。从头开始的正确语法是什么?

【问题讨论】:

    标签: python apache-kafka kafka-python


    【解决方案1】:

    正如它所说的

    必须是 TopicPartition 命名元组

    例如

    from kafka.structs import TopicPartition
    
    
    ...
    consumer.seek_to_beginning(TopicPartition(topic_name,0))
    

    本主题共有 32 个分区(索引从 0 到 31)

    tps = [TopicPartition(topic_name, i) for i in range(32)]
    consumer.seek_to_beginning(tps)
    

    【讨论】:

    • 谢谢。对于此语法,我需要删除 consumer = kc.... 行中的 topic_name 参数。它有效。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-07-26
    • 2012-12-04
    • 2018-12-23
    相关资源
    最近更新 更多