【发布时间】:2020-01-29 17:00:35
【问题描述】:
我刚刚学习 Spark,从 RDD 开始,现在转向 DataFrame。在我当前的 pyspark 项目中,我正在将 S3 文件读入 RDD 并对其运行一些简单的转换。这是代码。
segmentsRDD = sc.textFile(fileLocation). \
filter(lambda line: line.split(",")[6] in INCLUDE_SITES). \
filter(lambda line: line.split(",")[2] not in EXCLUDE_MARKETS). \
filter(lambda line: "null" not in line). \
map(splitComma). \
filter(lambda line: line.split(",")[5] == '1')
SplitComma 是一个函数,它对行数据进行一些日期计算并返回 10 个逗号分隔的字段。一旦我得到它,我就会运行最后一个过滤器,如图所示只拾取字段 [5] = 1 中的值的行。到目前为止一切都很好。
接下来,我想将segmentsRDD 转换为具有如下所示模式的DF。
interim_segmentsDF = segmentsRDD.map(lambda x: x.split(",")).toDF("itemid","market","itemkey","start_offset","end_offset","time_shifted","day_shifted","tmsmarketid","caption","itemstarttime")
但我收到一个关于无法将“pyspark.rdd.PipelinedRDD”转换为 DataFrame 的错误。你能解释一下“pyspark.rdd.PipelinedRDD”和“row RDD”之间的区别吗?我正在尝试使用如图所示的模式转换为 DF。我在这里错过了什么?
谢谢
【问题讨论】:
标签: pyspark apache-spark-sql rdd