【问题标题】:Reducing values in lists of (key, val) RDD's, given these lists are values in another list of (key, val) RDD's减少(键,值)REDD列表中的值,因为这些列表是另一个(键,值)REDD列表中的值
【发布时间】:2020-01-31 04:02:43
【问题描述】:

我已经为此烦恼了一段时间 - 非常感谢任何建议! 抱歉,标题太长了,我希望我将在下面构建一个简短的示例来更好地解释这一点。

假设我们有一个如下形式的 RDD:

data = sc.parallelize([(1,[('k1',4),('k2',3),('k1',2)]),\
           (2,[('k3',1),('k3',8),('k1',6)])])
data.collect()

输出:

[(1, [('k1', 4), ('k2', 3), ('k1', 2)]),
 (2, [('k3', 1), ('k3', 8), ('k1', 6)])]

我希望使用最深的 (key,val) RDD 列表执行以下操作

.reduceByKey(lambda a, b: a + b)

(即通过 key 减少这些 RDD 的值以通过 key 获得总和,同时保留与初始更高级别 RDD 的 key 映射的结果,这将产生以下输出):

[(1, [('k1', 6), ('k2', 3)]),
 (2, [('k3', 9), ('k1', 6)])]

我对 PySpark 比较陌生,可能在这里缺少一些基本的东西,但是我已经尝试了很多不同的方法,但基本上找不到访问和 reduceByKey 列表中的 (key,val) RDD 的方法,它本身就是另一个 RDD 的值。

非常感谢!

拒绝

【问题讨论】:

    标签: apache-spark pyspark rdd


    【解决方案1】:

    您要做的是:您的值(在输入 K,V 中)是一个 iterable,您希望在其上对内部键求和并将结果返回为 =>

    (outer_key(e.g 1,2) -> List(Inner_Key(E.g."K1","K2"),Summed_value))

    如您所见,总和是在 inner Key-V 上计算的, 我们可以通过

    首先从每个列表项中剥离元素

    => 将新键设为(外键,内键)

    => 对 (outer_key,inner_key) 求和 -> 值

    => 将数据格式改回 (outer_key ->(inner_key, summed_value))

    => 最后在外键上再次分组

    我不确定 Python 是什么,但我相信只需用 python 替换 Scala 集合语法就足够了,这就是解决方案

    SCALA 版本

    scala> val keySeq = Seq((1,List(("K1",4),("K2",3),("K1",2))),
         | (2,List(("K3",1),("K3",8),("K1",6))))
    keySeq: Seq[(Int, List[(String, Int)])] = List((1,List((K1,4), (K2,3), (K1,2))), (2,List((K3,1), (K3,8), (K1,6))))
    
    scala> val inRdd = sc.parallelize(keySeq)
    inRdd: org.apache.spark.rdd.RDD[(Int, List[(String, Int)])] = ParallelCollectionRDD[111] at parallelize at <console>:26
    
    scala> inRdd.take(10)
    res64: Array[(Int, List[(String, Int)])] = Array((1,List((K1,4), (K2,3), (K1,2))), (2,List((K3,1), (K3,8), (K1,6))))
    
    
    // And solution :
    scala> inRdd.flatMap { case (i,l) => l.map(l => ((i,l._1),l._2)) }.reduceByKey(_+_).map(x => (x._1._1 ->(x._1._2,x._2))).groupByKey.map(x => (x._1,x._2.toList.sortBy(x =>x))).collect()
    
    // RESULT ::
    res65: Array[(Int, List[(String, Int)])] = Array((1,List((K1,6), (K2,3))), (2,List((K1,6), (K3,9))))
    

    更新 => Python 解决方案

    >>> data = sc.parallelize([(1,[('k1',4),('k2',3),('k1',2)]),\
    ...            (2,[('k3',1),('k3',8),('k1',6)])])
    >>> data.collect()
    [(1, [('k1', 4), ('k2', 3), ('k1', 2)]), (2, [('k3', 1), ('k3', 8), ('k1', 6)])]
    
    # Similar operation
    
    >>> data.flatMap(lambda x : [ ((x[0],y[0]),y[1]) for y in x[1]]).reduceByKey(lambda a,b : (a+b)).map(lambda x : [x[0][0],(x[0][1],x[1])]).groupByKey().mapValues(list).collect()
    
    # RESULT 
    [(1, [('k1', 6), ('k2', 3)]), (2, [('k3', 9), ('k1', 6)])]
    

    【讨论】:

    • 谢谢,不幸的是我不太熟悉将解决方案行转换为python的Scala语法,但是python语法有本质的不同
    • @DenysPrykhodko => 添加了pyspark 版本。请检查,如果满足您的要求,请接受。谢谢。
    【解决方案2】:

    您应该 .map 您的数据集而不是减少,因为示例中的行数与源数据集中的行数相同,在 map 中您可以将值减少为 python 列表

    【讨论】:

      【解决方案3】:

      使用 ma​​pValues() + itertools.groupby()

      from itertools import groupby
      
      data.mapValues(lambda x: [ (k, sum(f[1] for f in g)) for (k,g) in groupby(sorted(x), key=lambda d: d[0]) ]) \
          .collect()
      #[(1, [('k1', 6), ('k2', 3)]), (2, [('k1', 6), ('k3', 9)])]
      

      使用 itertools.groupby,我们使用元组的第一项作为分组键 k,并将每个 g 中元组的第二项相加。

      编辑:对于大型数据集,使用 itertools.groupby 进行排序很昂贵,只需编写一个不带排序的函数即可:

      def merge_tuples(x):
          d = {}
          for (k,v) in x: 
              d[k] = d.get(k,0) + v
          return d.items()
      
      data.mapValues(merge_tuples).collect()
      #[(1, [('k2', 3), ('k1', 6)]), (2, [('k3', 9), ('k1', 6)])]
      

      【讨论】:

      • 谢谢,这看起来很简单,对我有用,但是在一个非常大的数据集(最深的 (key,val) 列表)上,我知道 itertools 不会使用分布式内核和可能很慢?
      • @DenysPrykhodko,对于一个非常大的集合,排序可能很昂贵,这对于这个任务来说无论如何都不是必需的。您可以检查仅使用字典而不对数据进行排序的更新函数。
      猜你喜欢
      • 1970-01-01
      • 2016-11-24
      • 2015-01-03
      • 2013-02-24
      • 1970-01-01
      • 1970-01-01
      • 2012-01-22
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多