【问题标题】:Expand an array to columns in spark with structured streaming使用结构化流将数组扩展到 Spark 中的列
【发布时间】:2019-04-01 11:27:41
【问题描述】:

我有这个问题:

我正在使用结构化流从 Kafka 读取数据,数据是 CSV 行。当我从 Kafka 获取数据时,我有一个流数据帧,其中 CSV 行位于“值”内,它是一个字节序列。

 sDF2 = sDF.selectExpr("CAST(value as string)").select( split("value",","))

使用这个我有一个新的数据框,其中“值”是一个字符串,它是 CSV 行。

我怎样才能获得一个新的数据框,其中我已将 CSV 字段解析并拆分为数据框列?

示例: csv 行是 "abcd,123,frgh,1321"

sDF schema, which contains the data downloaded from Kafka, is  
key, value, topic, timestamp etc... and here value is a byte sequence with no type

sDF2.schema has only a column ( named value of type string )

我喜欢新的数据框是

sDF3.col1 = abcd
sDF3.col2 = 123
sDF3.col3 = frgh ...etc

所有列都是字符串。

我仍然可以这样做:

 sDF3 = sDF2.select( sDF2.csv[0].alias("EventId").cast("string"),
 sDF2.csv[1].alias("DOEntitlementId").cast("string"),               
 sDF2.csv[3].alias("AmazonSubscriptionId").cast("string"),
 sDF2.csv[4].alias("AmazonPlanId").cast("string"),
 ... etc ... 

但它看起来很丑。

【问题讨论】:

    标签: apache-spark dataframe apache-kafka streaming


    【解决方案1】:

    我还没有尝试过,但是这样的东西应该可以工作。

    sDF2 = 
          sDF.selectExpr("CAST(value as string)")
           .alias("csv").select("csv.*")
           .select("split(value,',')[0] as DOEntitlementId", 
                   "split(value,',')[1] as AmazonSubscriptionId", 
                   "split(value,',')[2] as AmazonPlanId")
    

    【讨论】:

      猜你喜欢
      • 2022-11-14
      • 1970-01-01
      • 1970-01-01
      • 2018-01-24
      • 1970-01-01
      • 2018-03-24
      • 2017-05-04
      • 1970-01-01
      • 2021-03-05
      相关资源
      最近更新 更多