【问题标题】:Kafka python consumers running in parallel threadsKafka python消费者在并行线程中运行
【发布时间】:2018-06-11 23:52:05
【问题描述】:

我是 python 和 kafka 的新手。我有一个脚本应该启动三个 kafka 消费者,等待来自这些消费者的消息并做一些其他事情。在这一点上,我什至不知道我是否朝着正确的方向前进,因此我们将不胜感激。

class MainClass():
    def do_something_before(self):
        # something is done here

    def start_consumer(self):
        consumer1_thread = threading.Thread(target=self.cons1, args=())
        consumer2_thread = threading.Thread(target=self.cons2, args=())
        consumer1_thread.daemon = True
        consumer2_thread.daemon = True
        consumer1_thread.start()
        consumer2_thread.start()

    def cons1(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my-topic'])
        for message in consumer:
            print(message.value)

    def cons2(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest')
        consumer.subscribe(['my2-topic'])
        for message in consumer:
            print(message.value)

    def keep_working(self):
        # something is done here

if __name__ == 'main':
    g = MainClass()
    g.do_something_before()
    g.keep_working()

【问题讨论】:

  • 您能否更清楚地说明您希望我们在这里提供什么帮助。你有错误吗?代码的行为不符合您的预期吗?如果是这样,你期待什么,你在观察什么?

标签: python apache-kafka kafka-python


【解决方案1】:

我已经添加了带有 2 个消费者(基本上是两个 python 进程)的 python-kafka 示例,你可以在 github 链接上找到它https://github.com/Shubhamgorde/kafka-python-app

不能发布整个 python 文件,它有点大。

from multiprocessing import Process

def consumeData(topic):
    try:
         consumer = KafkaConsumer(topic, value_deserializer=lambda v: 
           binascii.unhexlify(v).decode('utf-8'))
    except:
         print("Error!!")

    for msg in consumer:
        msg=ast.literal_eval(msg.value)
        if(msg[2] == 'C'):
            performCreditOperation(msg)
        elif (msg[2] == 'D'):
              performDebitOperation(msg)

t1 = Process(target=consumeData, args=('Credit_transac',))
t2 = Process(target=consumeData, args=('Debit_transac',))
t1.start()
t2.start()

【讨论】:

    【解决方案2】:

    这是我的实现。希望对您有用。

    class ConsumerThread:
        def __init__(self, config, topics):
            self.config = config
            self.topics = topics
    
        def readData(self):
            consumer = Consumer(self.config)
            consumer.subscribe(self.topics)
            self.run(consumer)
    
        def process_msg(self, msg):
            print('Received message.')
            print('Key: {}, Val: {}'.format(msg.key(), msg.value()))
            print('Partition: {}, Offset: {}'.format(msg.partition(), msg.offset()))
    
        def run(self, consumer):
            try:
                while True:
                    msg = consumer.poll(0.1)
                    if not msg:
                        continue
                    if msg.error():
                        if msg.error().code() == KafkaError._PARTITION_EOF:
                            # End of partition event
                            print('End of partition reached {0}/{1}'
                                .format(msg.topic(), msg.partition()))
                        else:
                            raise KafkaException(msg.error())
                    else:
                        self.process_msg(msg)
    
            except KeyboardInterrupt:
                print("Detected Keyboard Interrupt. Cancelling.")
                pass
    
            finally:
                consumer.close()
    

    【讨论】:

      猜你喜欢
      • 2014-12-30
      • 2016-10-31
      • 1970-01-01
      • 1970-01-01
      • 2022-11-09
      • 2020-12-15
      • 2019-07-28
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多