【问题标题】:create pyspark rdd with lambda使用 lambda 创建 pyspark rdd
【发布时间】:2019-11-02 04:28:22
【问题描述】:

我要统计每个数字的百分比。

rdd1=sc.parallelize([1,2,3,4,1,5,7,3])

我试过了

rdd2=rdd1.map(lambda x: (x, 1)).reduceByKey(lambda current, next: (current+next))

得到 rdd2.collect(): [(1,2),(2,1),(3,2),(4,1),(5,1),(7,1)] 那么

percentage=rdd2.map(lambda x:(x[0],(x[1]/rdd1.count())))
print(percentage.collect())

打印步骤出错 然后我尝试了

percentage=rdd2.map(lambda x:(x[0],(x[1]/len(rdd1.collect()))))
print(percentage.collect())

打印步骤也有错误。

【问题讨论】:

  • 你到底想做什么?
  • rdd1中每个数字的百分比,比如1的百分比是2/8=25%

标签: lambda pyspark rdd


【解决方案1】:

我从你所说的中提取你想要 RDD 每个唯一成员的relative frequency

from operator import add

rdd1 = sc.parallelize([1,2,3,4,1,5,7,3])
count = rdd1.count()

rdd2=rdd1
    .map(lambda x: (x, 1))  # [(1,1),(2,1),(3,1),(4,1),(1,1),(5,1),(7,1),(3,1)]
    .reduceByKey(add)       # [(1,2),(2,1),(3,2),(4,1),(5,1),(7,1)]
    .mapValues( lambda vSum : vSum / count ) 

rdd2.collect()
# [(1,2/8),(2,1/8),(3,2/8),(4,1/8),(5,1/8),(7,1/8)]

【讨论】:

    【解决方案2】:

    SPARK-5603 表示不支持嵌套的 RDD 操作。

    您不能在转换中引用 RDD 操作:

    如果您事先调用 count() 的操作,您的代码将起作用。

    rdd1 = sc.parallelize([1,2,3,4,1,5,7,3])
    rdd2 = rdd1.map(lambda x: (x, 1)).reduceByKey(lambda current, next: (current+next))
    rdd1_len = rdd1.count()
    percentage=rdd2.map(lambda x:(x[0],(x[1]/rdd1_len)))
    
    percentage.collect()
    # [(1, 0.25), (2, 0.125), (3, 0.25), (4, 0.125), (5, 0.125), (7, 0.125)]
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-12-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-01-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多