【发布时间】:2021-05-05 21:43:45
【问题描述】:
我开发了一个 Python Kafka 生产者,它将多个 json 记录作为 nd-json 二进制字符串发送到 Kafka 主题。 然后我尝试使用 PySpark 在 Spark Structured Streaming 中读取这些消息,如下所示:
events_df = select(from_json(col("value").cast("string"), schema).alias("value"))
但此代码仅适用于单个 json 文档。 如果该值包含多条记录作为换行符分隔的 json,Spark 无法正确解码。
我不想为每个事件发送 kafka 消息。我怎样才能做到这一点?
【问题讨论】:
-
好吧,这里的
schema是什么?为什么不能将单个对象作为消息发送?如果该模式仅表示单个 JSON 对象,则该模式将成为问题......并且from_json也不适用于 ndjson。否则,如果您可以map对记录进行拆分功能,以便将其变为单独的记录,那么您应该在此处执行此操作 -
我通过从磁盘导入单个事件 json 自动生成了模式。如何拆分 kafka 消息的值,然后使用正确的模式正确解析 json?从磁盘读取实际上支持 ndjson 和模式推断
-
就像我说的那样,
schema是 Struct,而不是 ndjson,AFAIK 没有有效的模式类型。解决方法是通过平面图分割新行上的记录,然后将它们表示为单独的数据帧行。您仍然没有说明为什么不想发送单个消息(请记住,Kafka 不是用于“文件传输”,因此您不应该比较从磁盘读取和从 Kafka 消费) -
在实时流中,海量数据源源不断地到来,将数据拆分并一个一个事件发送是不可想象的。
-
是吗?您的回答正是这样做的。如果您确实希望消息中有多个对象,请使用适当的数组
标签: apache-spark pyspark apache-kafka spark-structured-streaming ndjson