【问题标题】:kafka-python consumer start reading from offset (automatically)kafka-python 消费者从偏移量开始读取(自动)
【发布时间】:2019-01-18 19:28:21
【问题描述】:

我正在尝试使用 kafka-python 构建一个应用程序,其中消费者从一系列主题中读取数据。消费者永远不会两次阅读同一条消息,但也永远不会错过任何一条消息,这一点非常重要。

一切似乎都工作正常,除非我关闭消费者(例如失败)并尝试从偏移量开始读取。我只能读取主题中的所有消息(这会创建双重读取)或仅收听新消息(并错过在故障期间发出的消息)。暂停消费者时我没有遇到这个问题。

我创建了一个孤立的模拟来尝试解决问题。

这里是通用生产者:

from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

x=0 # set manually to avoid duplicates 

for e in range(1000):
    if e <= x:
        pass
    else:
        data = dumps(
            {
            'number' : e
        }
        ).encode('utf-8')

        producer.send('numtest', value=data)
        print(e, ' send.')

        sleep(5)

还有消费者。如果将auto_offset_reset 设置为'earliest',则将再次读取所有消息。如果auto_offset_reset 设置为'latest',则不会读取停机期间的任何消息。

from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads

## Retrieve data from kafka (WHAT ABOUT MISSED MESSAGES?)
consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest', enable_auto_commit=True,
                         auto_commit_interval_ms=1000)


## Connect to database
client = MongoClient('localhost:27017')
collection = client.counttest.counttest

# Send data
for message in consumer:
    message = loads(message.value.decode('utf-8'))
    collection.insert_one(message)
    print('{} added to {}'.format(message, collection))

我觉得自动提交无法正常工作。

我知道这个问题类似于this one,但我想要一个具体的解决方案。

感谢您帮助我。

【问题讨论】:

    标签: python-3.x apache-kafka offset kafka-consumer-api kafka-python


    【解决方案1】:

    您收到此行为是因为您的消费者未使用消费者组。使用消费者组,消费者将定期向 Kafka 提交(保存)其位置。这样,如果它重新启动,它将从上次提交的位置恢复。

    要让你的消费者使用消费者组,你需要在构造它时设置group_id。 参见docs 中的group_id 描述:

    动态分区加入的消费者组名 分配(如果启用),并用于获取和提交 偏移量。如果没有,自动分区分配(通过组协调器) 并且偏移提交被禁用。默认值:无

    例如:

    consumer = KafkaConsumer('numtest', bootstrap_servers=['localhost:9092'],
                             auto_offset_reset='earliest', enable_auto_commit=True,
                             auto_commit_interval_ms=1000, group_id='my-group')
    

    【讨论】:

    • 谢谢米凯尔!更改了我的代码,它现在就像一个魅力。
    【解决方案2】:

    是否可以使用来自不同服务器的消费者。我已经尝试过下面的代码,它没有从 kafka 获取任何数据。

    consumer = KafkaConsumer('tet', bootstrap_servers=['192.168.1.20:9092'],
                         auto_offset_reset='earliest', enable_auto_commit=True,
                         auto_commit_interval_ms=1000, group_id=None)
    

    注意:- 当我提供错误的 ip 或端口号时,它会抛出异常。

    【讨论】:

      猜你喜欢
      • 2018-02-03
      • 2016-09-09
      • 2016-02-14
      • 1970-01-01
      • 2017-01-20
      • 2016-03-11
      • 2020-05-17
      • 1970-01-01
      • 2021-05-01
      相关资源
      最近更新 更多