【发布时间】:2018-06-11 23:52:05
【问题描述】:
我是 python 和 kafka 的新手。我有一个脚本应该启动三个 kafka 消费者,等待来自这些消费者的消息并做一些其他事情。在这一点上,我什至不知道我是否朝着正确的方向前进,因此我们将不胜感激。
class MainClass():
def do_something_before(self):
# something is done here
def start_consumer(self):
consumer1_thread = threading.Thread(target=self.cons1, args=())
consumer2_thread = threading.Thread(target=self.cons2, args=())
consumer1_thread.daemon = True
consumer2_thread.daemon = True
consumer1_thread.start()
consumer2_thread.start()
def cons1(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my-topic'])
for message in consumer:
print(message.value)
def cons2(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest')
consumer.subscribe(['my2-topic'])
for message in consumer:
print(message.value)
def keep_working(self):
# something is done here
if __name__ == 'main':
g = MainClass()
g.do_something_before()
g.keep_working()
【问题讨论】:
-
您能否更清楚地说明您希望我们在这里提供什么帮助。你有错误吗?代码的行为不符合您的预期吗?如果是这样,你期待什么,你在观察什么?
标签: python apache-kafka kafka-python