【问题标题】:Add field from RDD to other RDD将 RDD 中的字段添加到其他 RDD
【发布时间】:2017-10-29 15:20:01
【问题描述】:

我正在使用 Pyspark,并且必须使用 RDD(不是数据帧)来执行以下操作:

我有两个 RDD,rdd1,包含 100 多个带名称的字段和 rdd2,包含一个名为“city”的字段。 rdd1 和 rdd2 具有相同的行数(相同的长度)。

rdd1 是这样的:

Row(name="Jack", age=35, state="California", ...)  
Row(name"Jane", age=29, state="Florida", ...)  
...  

rdd2 是这样的:

Row(city="LA")  
Row(city="Miami")  
...

我希望 rdd1 变成:

Row(name="Jack", age=35, state="California", ..., city="LA")  
...

我尝试过的一切都失败了。有什么建议吗?

【问题讨论】:

    标签: python apache-spark pyspark rdd


    【解决方案1】:

    使用可用于 rdds 的 zip 方法。

    rdd_zip = rdd1.zip(rdd2) 
    
    #Flatten the rdd
    rdd_final = rdd_zip.map(lambda x: tuple(list(x[0]) + [x[1]]))
    

    【讨论】:

    • 谢谢解答,第一行压缩了rdd,但是第二行不行!
    • @JackHoe 最后一行出现什么错误?
    • 最后一行似乎有效,但是当我调用 rdd_final.take(3) 进行查看时,它只是抛出了这个错误: Py4JJavaError: An error occurred while calling z:org.apache.spark .api.python.PythonRDD.runJob。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 26.0 中的任务 0 失败 1 次,最近一次失败:阶段 26.0 中丢失任务 0.0(TID 45,本地主机,执行程序驱动程序):org.apache.spark .api.python.PythonException:回溯(最近一次调用最后一次):文件“D:\opt\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\wo ... TypeError: “int”对象不可迭代
    • @JackHoe 试试rdd_zip.map(lambda x: [elem for elem in x])。如果不行可以打印几行rdd_zip检查一下?
    • 现在它可以工作了,但结果并不完全是我正在寻找的 RDD_ZIP IS LIKE : [(Row(name="Jack", age=35, ...), "LA") , (Row....)] ########### RDD_FINAL IS LIKE : [[Row(name="Jack",age=35,..), "LA"], [Row. ..]] ########### 我喜欢 [(Row(name="Jack,age=35,city="LA"), (Row...)]
    猜你喜欢
    • 2015-07-06
    • 2016-06-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多