【问题标题】:Partition RDD into tuples of length n将 RDD 划分为长度为 n 的元组
【发布时间】:2016-01-08 20:11:32
【问题描述】:

我对 Apache Spark 和 Python 比较陌生,想知道我将要描述的内容是否可行?

我有一个 [m1, m2, m3, m4 形式的 RDD , m5, m6.......mn] (当你运行 rdd.collect() 时你会得到这个) .我想知道是否可以将此 RDD 转换为 [(m1, m2, m3) 形式的另一个 RDD, (m4, m5, m6).....(mn-2, m n-1, mn)]。内部元组的大小应为 k。如果 n 不能被 k 整除,则其中一个元组的元素应少于 k 个。

我尝试使用地图功能,但无法获得所需的输出。看来 map 函数只能返回一个元素数量与最初提供的 RDD 相同的 RDD。

更新:我尝试使用分区并且也能够让它工作。

rdd.map(lambda l: (l, l)).partitionBy(int(n/k)).glom().map(lambda ll: [x[0] for x in ll])

【问题讨论】:

    标签: python apache-spark pyspark rdd


    【解决方案1】:

    Olologin 的答案几乎是这样,但我相信您要做的是将您的 RDD 分组为 3 个元组,而不是将您的 RDD 分组为 3 个元组。要执行前者,请尝试以下操作:

    rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
    transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3)
                     .map(lambda (_, list): tuple([elem[0] for elem in list]))
    

    在 pyspark 中运行时,我得到以下信息:

    >>> from __future__ import print_function    
    >>> rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
    >>> transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3).map(lambda (_, list): tuple([elem[0] for elem in list]))
    >>> transformed.foreach(print)
    ...
    ('e4', 'e5', 'e6')
    ('e10',)
    ('e7', 'e8', 'e9')
    ('e1', 'e2', 'e3')
    

    【讨论】:

    • 是的,这就是我想要做的。您的解决方案有效。我还尝试使用 partitionBy() 和 glom() 进行此操作,并且还能够使其正常工作。
    【解决方案2】:

    我假设您使用的是 pyspark api: 我不知道这是否是最好的解决方案,但我认为这可以通过以下方式完成: zipWithIndex groupBy 和简单的地图。

    # 3 - your grouping k
    # ci - list of tuples (char, idx)
    rdd = sc.parallelize(["a", "b", "c", "d", "e"]).zipWithIndex()\
            .groupBy(lambda (char, idx): idx/3 )\
            .map(lambda (remainder, ci):tuple([char for char, idx in ci]))\
            .collect()
    print rdd
    

    输出:

    [('a', 'b', 'c'), ('d', 'e')]
    

    UPD:感谢@Rohan Aletty 纠正了我。

    【讨论】:

      【解决方案3】:

      可以在不改组的情况下处理此问题 (groupBy),但与 OlologinRohan Aletty 的解决方案相比,它需要更多代码。一个整体的想法是只传输保持分区之间连续性所需的部分:

      from toolz import partition, drop, take, concatv
      
      
      def grouped(self, n, pad=None):
          """
          Group RDD into tuples of size n
      
          >>> rdd = sc.parallelize(range(10))
          >>> grouped(rdd, 3).collect()
          >>> [(0, 1, 2), (3, 4, 5), (6, 7, 8), (9, None, None)]
          """
          assert isinstance(n, int)
          assert n > 0
      
          def _analyze(i, iter):
              """
              Given partition idx and iterator return a tuple
              (idx, numbe-of-elements prefix-of-size-(n-1))
              """
              xs = [x for x in iter]
              return [(i, len(xs), xs[:n - 1])]
      
          def _compact(prefixes, prefix):
              """
              'Compact' a list of prefixes to compensate for
              partitions with less than (n-1) elements
              """
              return prefixes + [(prefix + prefixes[-1])[:n-1]]
      
          def _compute(prvs, cnt):
              """
              Compute number of elements to drop from current and
              take from the next parition given previous state
              """
              left_to_drop, _to_drop, _to_take = prvs[-1]
              diff = cnt - left_to_drop
      
              if diff <= 0:
                  return prvs + [(-diff, cnt, 0)]
      
              else:
                  to_take = (n - diff % n) % n
                  return prvs + [(to_take, left_to_drop, to_take)]
      
          def _group_partition(i, iter):
              """
              Return grouped entries for a given partition
              """
              (_, to_drop, to_take), next_head = heads_bd.value[i]
              return partition(n, concatv(
                  drop(to_drop, iter), take(to_take, next_head)), pad=pad)
      
          if n == 1:
              return self.map(lambda x: (x, ))
      
          idxs, counts, prefixes = zip(
              *self.mapPartitionsWithIndex(_analyze).collect())
      
          heads_bd = self.context.broadcast({x[0]: (x[1], x[2]) for x in zip(idxs,
              reduce(_compute, counts, [(0, None, None)])[1:],
              reduce(_compact, prefixes[::-1], [[]])[::-1][1:])})
      
          return self.mapPartitionsWithIndex(_group_partition) 
      

      它在很大程度上依赖于出色的 toolz 库,但如果您希望避免外部依赖,您可以使用标准库轻松重写它。

      示例用法:

      >>> rdd = sc.parallelize(range(10))
      >>> grouped(rdd, 3).collect()
      [(0, 1, 2), (3, 4, 5), (6, 7, 8), (9, None, None)]
      

      如果你想保持一致的 API,你可以猴子补丁 RDD 类:

      >>> from  pyspark.rdd import RDD
      >>> RDD.grouped = grouped
      >>> rdd.grouped(4).collect()
      [(0, 1, 2, 3), (4, 5, 6, 7), (8, 9, None, None)]
      

      您可以找到基本测试on GitHub

      【讨论】:

        猜你喜欢
        • 2020-03-13
        • 2012-07-04
        • 1970-01-01
        • 1970-01-01
        • 2014-04-12
        • 1970-01-01
        • 1970-01-01
        • 2016-04-11
        • 1970-01-01
        相关资源
        最近更新 更多