【发布时间】:2018-11-20 23:29:12
【问题描述】:
我搭建了一个Kafka消费者-生产者系统,我需要对传输的消息进行处理。这些是来自 JSON 文件的行,例如
ConsumerRecord(topic=u'json_data103052', partition=0, offset=676, timestamp=1542710197257, timestamp_type=0, key=None, value='{"Name": "Simone", "Surname": "Zimbolli", "gender": "Other", "email": "zzz@uiuc.edu", "country": "Nigeria", "date": "11/07/2018"}', checksum=354265828, serialized_key_size=-1, serialized_value_size=189)
我正在寻找一种易于实施的解决方案
- 定义流式窗口
- 分析窗口中的消息(统计唯一用户和类似事物的数量)
有人对如何进行有建议吗?谢谢。
我在使用 Spark 时遇到问题,所以我宁愿避免使用它。我正在使用 Jupyter 在 Python 中编写脚本。
这是我的代码:
from kafka import KafkaConsumer
from random import randint
from time import sleep
bootstrap_servers = ['localhost:9092']
%store -r topicName # Get the topic name from the kafka producer
print topicName
consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers,
auto_offset_reset='earliest'
)
consumer.subscribe([topicName])
for message in consumer:
print (message)
【问题讨论】:
-
您可以考虑使用 kafka-stream API,它似乎对您的窗口场景非常有用
-
在回答中添加了更多详细信息
标签: python json apache-kafka kafka-consumer-api