【问题标题】:Parse Kafka JSON stream using pyspark and save to mongodb使用 pyspark 解析 Kafka JSON 流并保存到 mongodb
【发布时间】:2020-03-29 10:01:29
【问题描述】:

我有一个从 Kafka 到 Spark 的警报流。这些是来自不同 IoT 传感器的 JSON 格式的警报。

卡夫卡流:

{ "id":"2093021", alert:"故障 检测到","sensor_id":"14-23092-AS" }
{“id”:“2093021”,警报:“故障 检测到","sensor_id":"14-23092-AS" , "alarm_code": "严重" }

我的代码:spark-client.py

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
import json

if __name__ == "__main__":
    spark = SparkSession.builder.appName("myApp").config("spark.mongodb.input.uri", "mongodb://spark:1234@172.31.9.44/at_cloudcentral.spark_test").config("spark.mongodb.output.uri", "mongodb://spark:1234@172.31.9.44/at_cloudcentral.spark_test").getOrCreate()
    sc =  spark.sparkContext
    ssc = StreamingContext(sc, 10)   

zkQuorum, topic = sys.argv[1:]

kafka_streams =KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-sql-mongodb-test-consumer", {topic: 1})

dstream = kafka_streams.map(lambda x: json.loads(x[1]))
dstream.pprint()



ssc.start()
ssc.awaitTermination()

当我运行它时

ubuntu@ip-172-31-89-176:~/spark-connectors$ spark-submit spark-client.py localhost:2181 DetectionEntry

我得到这个输出

-------------------------------------------
Time: 2019-12-04 14:26:40
-------------------------------------------
{u'sensor_id': u'16-23092-AS', u'id': u'2093021', u'alert': u'Malfunction detected'}

我需要能够将此警报保存到远程 MongoDB。我有两个具体的挑战:

  1. 如何正确解析输出,以便创建可写入 mongodb 的数据帧?我已经尝试将其添加到代码的末尾

    d = [dstream] df = spark.createDataFrame(d).collect()

它给了我这个错误

dataType py4j.java_gateway.JavaMember 对象在 0x7f5912726750 应该 是类 'pyspark.sql.types.DataType' 的一个实例

  1. 我的警报可以有不同的 json 结构,我需要将它们转储到 mongodb 集合中。因此,固定模式对我不起作用。我在 stackoverflow 中提到的大多数类似问题和代码都特定于固定模式,我无法弄清楚如何将其推送到 mongodb 以使 mongodb 集合中的每条记录都有自己的模式(json结构体)。请求任何指向正确方向的指针。

【问题讨论】:

  • 那么您在 Kafka 主题中有数据,您想要处理这些数据并流式传输到 MongoDB?你准备好使用 Spark 了吗?因为从外观上看,Kafka Connect 将数据直接流式传输到 Mongo 会更容易。
  • 由于 MongoDB 不需要固定模式,因此直接流向它会像@RobinMoffatt 提到的那样工作。查看文档:docs.mongodb.com/manual/core/data-modeling-introduction/…您还可以查看 Scala 的 Spark Streaming 编程指南以了解它:docs.mongodb.com/spark-connector/master/scala/streaming
  • 将 kafka 流直接保存到 MongoDB 会存储大量不需要的日志。我希望只存储 spark 将处理和隐藏的相关日志。休息可以被忽略或转储到平面文件存储中。

标签: mongodb apache-spark pyspark apache-kafka


【解决方案1】:

我们可以通过基于 Pyspark 的结构化流 API 调用简单的 UDF 轻松解析 Kafka JSON 消息。您可以在下面的堆栈溢出链接中查看完整代码以供参考。 Pyspark Structured streaming processing

【讨论】:

    猜你喜欢
    • 2018-08-02
    • 1970-01-01
    • 2018-03-21
    • 1970-01-01
    • 1970-01-01
    • 2016-08-08
    • 2013-02-22
    • 2015-09-23
    • 2019-10-26
    相关资源
    最近更新 更多