【发布时间】:2019-04-01 11:27:41
【问题描述】:
我有这个问题:
我正在使用结构化流从 Kafka 读取数据,数据是 CSV 行。当我从 Kafka 获取数据时,我有一个流数据帧,其中 CSV 行位于“值”内,它是一个字节序列。
sDF2 = sDF.selectExpr("CAST(value as string)").select( split("value",","))
使用这个我有一个新的数据框,其中“值”是一个字符串,它是 CSV 行。
我怎样才能获得一个新的数据框,其中我已将 CSV 字段解析并拆分为数据框列?
示例: csv 行是 "abcd,123,frgh,1321"
sDF schema, which contains the data downloaded from Kafka, is
key, value, topic, timestamp etc... and here value is a byte sequence with no type
sDF2.schema has only a column ( named value of type string )
我喜欢新的数据框是
sDF3.col1 = abcd
sDF3.col2 = 123
sDF3.col3 = frgh ...etc
所有列都是字符串。
我仍然可以这样做:
sDF3 = sDF2.select( sDF2.csv[0].alias("EventId").cast("string"),
sDF2.csv[1].alias("DOEntitlementId").cast("string"),
sDF2.csv[3].alias("AmazonSubscriptionId").cast("string"),
sDF2.csv[4].alias("AmazonPlanId").cast("string"),
... etc ...
但它看起来很丑。
【问题讨论】:
标签: apache-spark dataframe apache-kafka streaming