【问题标题】:Splitting Kafka Message Line by line in Spark Structured Streaming在 Spark Structured Streaming 中逐行拆分 Kafka 消息行
【发布时间】:2019-02-08 11:02:42
【问题描述】:
我想将 Spark 结构化流作业中来自 Kafka 主题的消息读入数据帧。但是我在一个偏移量中获取整个消息,因此在数据帧中只有该消息进入一行而不是多行。 (在我的情况下是 3 行)
当我打印此消息时,我得到以下输出:
我想要在数据框中的 3 行中的消息“Text1”、“Text2”和“Text3”,以便我可以进一步处理。
请帮帮我。
【问题讨论】:
标签:
apache-spark
pyspark
spark-streaming
spark-streaming-kafka
【解决方案1】:
您可以使用 用户定义函数 (UDF) 将消息字符串转换为字符串序列,然后对该列应用 explode 函数,以创建序列中每个元素的新行:
如下图所示(在 scala 中,同样的原理也适用于 pyspark):
case class KafkaMessage(offset: Long, message: String)
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.explode
val df = sc.parallelize(List(KafkaMessage(1000, "Text1\nText2\nText3"))).toDF()
val splitString = udf { s: String => s.split('\n') }
df.withColumn("splitMsg", explode(splitString($"message")))
.select("offset", "splitMsg")
.show()
这将产生以下输出:
+------+--------+
|offset|splitMsg|
+------+--------+
| 1000| Text1|
| 1000| Text2|
| 1000| Text3|
+------+--------+