【发布时间】:2020-03-29 10:01:29
【问题描述】:
我有一个从 Kafka 到 Spark 的警报流。这些是来自不同 IoT 传感器的 JSON 格式的警报。
卡夫卡流:
{ "id":"2093021", alert:"故障 检测到","sensor_id":"14-23092-AS" }
{“id”:“2093021”,警报:“故障 检测到","sensor_id":"14-23092-AS" , "alarm_code": "严重" }
我的代码:spark-client.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
import json
if __name__ == "__main__":
spark = SparkSession.builder.appName("myApp").config("spark.mongodb.input.uri", "mongodb://spark:1234@172.31.9.44/at_cloudcentral.spark_test").config("spark.mongodb.output.uri", "mongodb://spark:1234@172.31.9.44/at_cloudcentral.spark_test").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 10)
zkQuorum, topic = sys.argv[1:]
kafka_streams =KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-sql-mongodb-test-consumer", {topic: 1})
dstream = kafka_streams.map(lambda x: json.loads(x[1]))
dstream.pprint()
ssc.start()
ssc.awaitTermination()
当我运行它时
ubuntu@ip-172-31-89-176:~/spark-connectors$ spark-submit spark-client.py localhost:2181 DetectionEntry
我得到这个输出
-------------------------------------------
Time: 2019-12-04 14:26:40
-------------------------------------------
{u'sensor_id': u'16-23092-AS', u'id': u'2093021', u'alert': u'Malfunction detected'}
我需要能够将此警报保存到远程 MongoDB。我有两个具体的挑战:
-
如何正确解析输出,以便创建可写入 mongodb 的数据帧?我已经尝试将其添加到代码的末尾
d = [dstream] df = spark.createDataFrame(d).collect()
它给了我这个错误
dataType py4j.java_gateway.JavaMember 对象在 0x7f5912726750 应该 是类 'pyspark.sql.types.DataType' 的一个实例
- 我的警报可以有不同的 json 结构,我需要将它们转储到 mongodb 集合中。因此,固定模式对我不起作用。我在 stackoverflow 中提到的大多数类似问题和代码都特定于固定模式,我无法弄清楚如何将其推送到 mongodb 以使 mongodb 集合中的每条记录都有自己的模式(json结构体)。请求任何指向正确方向的指针。
【问题讨论】:
-
那么您在 Kafka 主题中有数据,您想要处理这些数据并流式传输到 MongoDB?你准备好使用 Spark 了吗?因为从外观上看,Kafka Connect 将数据直接流式传输到 Mongo 会更容易。
-
由于 MongoDB 不需要固定模式,因此直接流向它会像@RobinMoffatt 提到的那样工作。查看文档:docs.mongodb.com/manual/core/data-modeling-introduction/…您还可以查看 Scala 的 Spark Streaming 编程指南以了解它:docs.mongodb.com/spark-connector/master/scala/streaming
-
将 kafka 流直接保存到 MongoDB 会存储大量不需要的日志。我希望只存储 spark 将处理和隐藏的相关日志。休息可以被忽略或转储到平面文件存储中。
标签: mongodb apache-spark pyspark apache-kafka