【问题标题】:PySpark groupByKey returning pyspark.resultiterable.ResultIterablePySpark groupByKey 返回 pyspark.resultiterable.ResultIterable
【发布时间】:2025-12-05 02:40:01
【问题描述】:

我试图弄清楚为什么我的 groupByKey 返回以下内容:

[(0, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210>), (1, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a4d0>), (2, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a390>), (3, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a290>), (4, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a450>), (5, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a350>), (6, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a1d0>), (7, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a490>), (8, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a050>), (9, <pyspark.resultiterable.ResultIterable object at 0x7fc659e0a650>)]

我有如下所示的 flatMapped 值:

[(0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D'), (0, u'D')]

我只是做一个简单的:

groupRDD = columnRDD.groupByKey()

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    例子:

    r1 = sc.parallelize([('a',1),('b',2)])
    r2 = sc.parallelize([('b',1),('d',2)])
    r1.cogroup(r2).mapValues(lambda x:tuple(reduce(add,__builtin__.map(list,x))))
    

    结果:

    [('d', (2,)), ('b', (2, 1)), ('a', (1,))]
    

    【讨论】:

      【解决方案2】:

      说你的代码是..

      ex2 = ex1.groupByKey()
      

      然后你跑..

      ex2.take(5)
      

      你会看到一个可迭代的。如果您要对这些数据做一些事情,这没关系,您可以继续前进。但是,如果您只想在继续之前先打印/查看值,那么这里有点小技巧......

      ex2.toDF().show(20, False)
      

      或者只是

      ex2.toDF().show()
      

      这将显示数据的值。你不应该使用collect(),因为这会将数据返回给驱动程序,如果你正在处理大量数据,那会让你崩溃。现在,如果 ex2 = ex1.groupByKey() 是您的最后一步,并且您希望返回这些结果,那么可以使用 collect(),但请确保您知道返回的数据量很小。

      print(ex2.collect())
      

      这是另一个关于在 RDD 上使用 collect() 的好帖子

      View RDD contents in Python Spark?

      【讨论】:

        【解决方案3】:

        除了上述答案之外,如果您想要唯一项目的排序列表,请使用以下内容:

        不同的排序值列表

        example.groupByKey().mapValues(set).mapValues(sorted)
        

        只是排序值列表

        example.groupByKey().mapValues(sorted)
        

        上述替代方案

        # List of distinct sorted items
        example.groupByKey().map(lambda x: (x[0], sorted(set(x[1]))))
        
        # just sorted list of items
        example.groupByKey().map(lambda x: (x[0], sorted(x[1])))
        

        【讨论】:

          【解决方案4】:

          你也可以使用

          example.groupByKey().mapValues(list)
          

          【讨论】:

            【解决方案5】:

            我建议您使用 cogroup(),而不是使用 groupByKey()。你可以参考下面的例子。

            [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
            

            例子:

            >>> x = sc.parallelize([("foo", 1), ("bar", 4)])
            >>> y = sc.parallelize([("foo", -1)])
            >>> z = [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
            >>> print(z)
            

            你应该得到想要的输出...

            【讨论】:

              【解决方案6】:

              你得到的是一个允许你迭代结果的对象。您可以通过在值上调用 list() 将 groupByKey 的结果转换为列表,例如

              example = sc.parallelize([(0, u'D'), (0, u'D'), (1, u'E'), (2, u'F')])
              
              example.groupByKey().collect()
              # Gives [(0, <pyspark.resultiterable.ResultIterable object ......]
              
              example.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()
              # Gives [(0, [u'D', u'D']), (1, [u'E']), (2, [u'F'])]
              

              【讨论】:

              • example.groupByKey().mapValues(list).collect() 更短,也可以使用
              • 如何映射ResultIterable类型?