【问题标题】:Splitting Kafka Message Line by line in Spark Structured Streaming在 Spark Structured Streaming 中逐行拆分 Kafka 消息行
【发布时间】:2019-02-08 11:02:42
【问题描述】:

我想将 Spark 结构化流作业中来自 Kafka 主题的消息读入数据帧。但是我在一个偏移量中获取整个消息,因此在数据帧中只有该消息进入一行而不是多行。 (在我的情况下是 3 行)

当我打印此消息时,我得到以下输出:

我想要在数据框中的 3 行中的消息“Text1”、“Text2”和“Text3”,以便我可以进一步处理。

请帮帮我。

【问题讨论】:

    标签: apache-spark pyspark spark-streaming spark-streaming-kafka


    【解决方案1】:

    您可以使用 用户定义函数 (UDF) 将消息字符串转换为字符串序列,然后对该列应用 explode 函数,以创建序列中每个元素的新行:

    如下图所示(在 scala 中,同样的原理也适用于 pyspark):

    case class KafkaMessage(offset: Long, message: String)
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.sql.functions.explode
    
    val df = sc.parallelize(List(KafkaMessage(1000, "Text1\nText2\nText3"))).toDF()
    
    val splitString = udf { s: String => s.split('\n') }
    
    df.withColumn("splitMsg", explode(splitString($"message")))
      .select("offset", "splitMsg")
      .show()
    

    这将产生以下输出:

    +------+--------+
    |offset|splitMsg|
    +------+--------+
    |  1000|   Text1|
    |  1000|   Text2|
    |  1000|   Text3|
    +------+--------+
    

    【讨论】:

      猜你喜欢
      • 2021-05-05
      • 2019-04-27
      • 2020-12-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-05-22
      • 2020-07-25
      • 2017-10-30
      相关资源
      最近更新 更多