【问题标题】:Analyze messages from Kafka consumer分析来自 Kafka 消费者的消息
【发布时间】:2018-11-20 23:29:12
【问题描述】:

我搭建了一个Kafka消费者-生产者系统,我需要对传输的消息进行处理。这些是来自 JSON 文件的行,例如

ConsumerRecord(topic=u'json_data103052', partition=0, offset=676, timestamp=1542710197257, timestamp_type=0, key=None, value='{"Name": "Simone", "Surname": "Zimbolli", "gender": "Other", "email": "zzz@uiuc.edu", "country": "Nigeria", "date": "11/07/2018"}', checksum=354265828, serialized_key_size=-1, serialized_value_size=189)

我正在寻找一种易于实施的解决方案

  • 定义流式窗口
  • 分析窗口中的消息(统计唯一用户和类似事物的数量)

有人对如何进行有建议吗?谢谢。

我在使用 Spark 时遇到问题,所以我宁愿避免使用它。我正在使用 Jupyter 在 Python 中编写脚本。

这是我的代码:

from kafka import KafkaConsumer
from random import randint
from time import sleep

bootstrap_servers = ['localhost:9092']

%store -r topicName    # Get the topic name from the kafka producer
print topicName

consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers,
                         auto_offset_reset='earliest'
                        )
consumer.subscribe([topicName])

for message in consumer:
    print (message)

【问题讨论】:

  • 您可以考虑使用 kafka-stream API,它似乎对您的窗口场景非常有用
  • 在回答中添加了更多详细信息

标签: python json apache-kafka kafka-consumer-api


【解决方案1】:

我猜你需要使用 Kafka Streams API。您拥有开窗所需的所有功能。 您可以在此处找到有关 Kafka Streams 的更多信息:

https://kafka.apache.org/documentation/streams/

【讨论】:

  • OP 似乎在寻求 Python 解决方案,不过
【解决方案2】:

对于您的场景,Kafka Streams 似乎很合适。它支持 windowing 有以下 4 种类型:

Tumbling time window - Time-based   Fixed-size, non-overlapping, gap-less windows
Hopping time window- Time-based Fixed-size, overlapping windows
Sliding time window- Time-based Fixed-size, overlapping windows that work on differences between record timestamps
Session window

对于python,有库:https://github.com/wintoncode/winton-kafka-streams

这对你很有用。

【讨论】:

    猜你喜欢
    • 2019-07-01
    • 2016-01-21
    • 2020-01-10
    • 2017-04-14
    • 2017-09-23
    • 1970-01-01
    • 1970-01-01
    • 2021-07-09
    • 1970-01-01
    相关资源
    最近更新 更多