【问题标题】:pyspark - TypeError: 'Price' object is not iterablepyspark - TypeError:“价格”对象不可迭代
【发布时间】:2018-08-25 09:30:33
【问题描述】:

当我尝试将 Price 对象的 RDD 转换为 Pair RDD 时出现 TypeError。

示例代码:

priceRDD = pppConformInDF.rdd.map(lambda row: Price(row.vyge_id, row.strm_typ_cd, row.sfb_nm, row.txn_dt, row.vfa_extra_am, '2'))
priceKeyValueRDD = priceRDD.map(lambda price: (",".join([price.vyge_id, price.strm_typ_cd, price.sfb_nm]), list(price)))

如何将 Price 对象的 RDD 转换为 Key 值 RDD,value 是 Price 对象,Key 是 Price 对象中某些字段的组合。

错误日志:

    priceKeyValueRDD = priceRDD.map(lambda price: (",".join([price.vyge_id, price.strm_typ_cd, price.sfb_nm]), list(price)))
TypeError: 'Price' object is not iterable

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:149)
    at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:149)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

【问题讨论】:

    标签: pyspark rdd


    【解决方案1】:

    您得到的错误是由于list(price) 操作,因为价格无法转换为列表。

    只需删除list,然后使用groupByKey() 将具有相同键的价格聚合到一个列表中。

    priceRDD = pppConformInDF.rdd.map(lambda row: Price(row.vyge_id, row.strm_typ_cd, row.sfb_nm, row.txn_dt, row.vfa_extra_am, '2'))
    priceKeyValueRDD = priceRDD.map(lambda price: (",".join([price.vyge_id, price.strm_typ_cd, price.sfb_nm]), price))
    priceGroupedRDD = priceKeyValueRDD.groupByKey() # returns something like [(key->iterable of prices)]
    

    【讨论】:

    • 结果以元组形式出现,值是 Price 对象而不是 Price 列表。我错过了什么吗?
    • 如果将map函数应用到priceGroupedRDD,每一行确实是一个元组(key, iterable of prices)
    • 例如获取列表长度,priceGroupedRDD.map(lambda x : (x[0], len(x[1])))
    猜你喜欢
    • 2018-08-17
    • 2013-09-01
    • 2016-08-11
    • 2017-08-27
    • 2018-10-10
    • 2021-12-13
    • 2019-02-20
    • 2020-03-27
    • 2018-12-12
    相关资源
    最近更新 更多