【问题标题】:How to aggregate json data in kafka consumer using python?如何使用 python 在 kafka 消费者中聚合 json 数据?
【发布时间】:2020-09-23 14:43:24
【问题描述】:

我在 KAFKA Transactions 主题中生成的数据如下:

ConsumerRecord(topic='Transactions', partition=0, offset=3, timestamp=1591277946735, timestamp_type=0, key=None, value={'transaction_id': '9495601361', 'account_number': 14, 'transaction_reference ': '20070', 'transaction_datetime': '2020-06-04T19:09:06.735129', 'amount': 260.93}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=160, serialized_header_size=- 1)

ConsumerRecord(topic='Transactions', partition=0, offset=4, timestamp=1591277946736, timestamp_type=0, key=None, value={'transaction_id': '4952940859', 'account_number': 14, 'transaction_reference ': '44291', 'transaction_datetime': '2020-06-04T19:09:06.736128', 'amount': 2.82}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=158, serialized_header_size=- 1)

ConsumerRecord(topic='Transactions', partition=0, offset=5, timestamp=1591277946737, timestamp_type=0, key=None, value={'transaction_id': '0193362270', 'account_number': 12, 'transaction_reference ': '96312', 'transaction_datetime': '2020-06-04T19:09:06.736128', 'amount': 766.95}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=160, serialized_header_size=- 1)

到目前为止编写的消费者代码是:

consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['Transactions'])
for message in consumer:
            print (message)

我想要像 (account_number, sum(amount)) 的元组这样的输出,我该如何实现呢?

【问题讨论】:

    标签: python apache-kafka kafka-consumer-api kafka-producer-api kafka-python


    【解决方案1】:

    我认为字典对于分组数据可能比元组更有用。 defaultdict 非常适合这个过程

    from collections import defaultdict
    
    accounts = defaultdict(int)
    
    for message in consumer:
        payload = message.value
        account = payload['account_number']
        amount = payload['amount']
    
        accounts[account] += amount
    
    
    print(accounts)
    
    defaultdict(<class 'int'>,{
      "14": 263.75,
      "12": 766.95
    })
    

    要获取您可能正在寻找的元组,您可以在循环之后迭代 accounts.items()

    for info in accounts.items():
        print(info)
    
    ("14", 263.75)
    ("12", 766.95)
    

    【讨论】:

    • C.Nivs,感谢您的回答 defaultdict(, {20: 433.13, 14: 582.7, 10: 895.99, 18: 1773.8899999999999, 2: 858.42, 8: 222.71, 13: 746.32}),但我们不能得到所需的元组输出通知吗?
    • 不在您的循环内。但是,您可以在循环之后迭代 accounts.items(),这将为您提供所需的元组
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-07
    • 2020-05-26
    • 1970-01-01
    • 1970-01-01
    • 2019-08-27
    • 2019-02-04
    相关资源
    最近更新 更多