【问题标题】:Pyspark: Create histogram for each key in Pair RDDPyspark:为 Pair RDD 中的每个键创建直方图
【发布时间】:2023-03-29 14:45:01
【问题描述】:

我是 pyspark 的新手。我有一个 Pair RDD(键,值)。我想为每个键创建一个包含 n 个桶的直方图。输出将是这样的:

[(key1, [...buckets...], [...counts...]),
 (key2, [...buckets...], [...counts...])]

我已经看到了检索最大值或每个键的总和的示例,但是有没有办法传递 histogram(n) 函数以应用于每个键的值?

【问题讨论】:

    标签: python apache-spark histogram pyspark rdd


    【解决方案1】:

    我知道这篇文章已经很老了,但对于仍在寻求 PySpark 解决方案的人来说,这是我在这个问题上的两分钱。

    让我们考虑一个(键,值)对 RDD,假设通过“直方图”,我们有一个简单的计数器来计算每个键有多少不同的值,以及它们各自的基数。

    aggregateByKey() 是一个不错的选择。在aggregateByKey() 中基本上声明了三个输入值:聚合器默认值、分区内聚合函数、分区间聚合函数。

    让我们考虑为表单设置一个 RDD

    [(124, 2),
     (124, 2),
     (124, 2),
     (125, 2),
     (125, 2),
     (125, 2),
     (126, 2),
     (126, 2),
     (126, 2),
     (127, 2),
     (127, 2),
     (127, 2),
     (128, 2),
     (128, 2),
     (128, 2),
     (129, 2),
     (129, 2),
     (129, 2),
     (130, 2),
     (130, 2),
     (130, 2),
     (131, 2),
     (131, 2),
     (131, 2),
     (132, 2),
     (132, 2),
     (132, 2),
     (133, 2),
     (133, 2),
     (133, 2),
     (134, 2),
     (134, 2),
     (134, 2),
     (135, 2),
     (135, 2),
     (135, 2),
     (136, 2),
     (136, 1),
     (136, 2),
     (137, 2),
     (137, 2),
     (137, 2),
     (138, 2),
     (138, 2),
     (138, 2),
     (139, 2),
     (139, 2),
     (139, 2),
     (140, 2),
     (140, 2),
     (140, 2),
     (141, 2),
     (141, 1),
     (141, 1),
     (142, 2),
     (142, 2),
     (142, 2),
     (143, 2),
     (143, 2),
     (143, 2),
     (144, 1),
     (144, 1),
     (144, 2),
     (145, 1),
     (145, 1),
     (145, 1),
     (146, 2),
     (146, 2),
     (146, 2),
     (147, 2),
     (147, 2),
     (147, 2),
     (148, 2),
     (148, 2),
     (148, 2),
     (149, 2),
     (149, 2),
     (149, 2),
     (150, 2),
     (150, 2),
     (150, 2),
     (151, 2),
     (151, 2),
     (151, 2),
     (152, 2),
     (152, 2),
     (152, 2),
     (153, 2),
     (153, 1),
     (153, 2),
     (154, 2),
     (154, 2),
     (154, 2),
     (155, 2),
     (155, 1),
     (155, 2),
     (156, 2),
     (156, 2),
     (156, 2),
     (157, 1),
     (157, 2),
     (157, 2),
     (158, 2),
     (158, 2),
     (158, 2),
     (159, 2),
     (159, 2),
     (159, 2),
     (160, 2),
     (160, 2),
     (160, 2),
     (161, 2),
     (161, 1),
     (161, 2),
     (162, 2),
     (162, 2),
     (162, 2),
     (163, 2),
     (163, 1),
     (163, 2),
     (164, 2),
     (164, 2),
     (164, 2),
     (165, 2),
     (165, 2),
     (165, 2),
     (166, 2),
     (166, 1),
     (166, 2),
     (167, 2),
     (167, 2),
     (167, 2),
     (168, 2),
     (168, 1),
     (168, 1),
     (169, 2),
     (169, 2),
     (169, 2),
     (170, 2),
     (170, 2),
     (170, 2),
     (171, 2),
     (171, 2),
     (171, 2),
     (172, 2),
     (172, 2),
     (172, 2),
     (173, 2),
     (173, 2),
     (173, 1),
     (174, 2),
     (174, 1),
     (174, 1),
     (175, 1),
     (175, 1),
     (175, 1),
     (176, 1),
     (176, 1),
     (176, 1),
     (177, 2),
     (177, 2),
     (177, 2)]
    

    据我所知,最简单的方法是根据 Python 字典聚合每个键中的值,其中字典键是 RDD 值,与每个字典键关联的值是多少 RDD 的计数器每个 RDD 值都有对应的值。不需要考虑 RDD 键,aggregateByKey() 函数会自动处理 RDD 键。

    聚合调用的形式

    myRDD.aggregateByKey(dict(), withinPartition, betweenPartition)
    

    我们将所有累加器初始化为空字典。

    因此,分区内聚合函数具有以下形式

    def withinPartition(dictionary, record):
        if record in dictionary.keys():
            dictionary[record] += 1
        else:
            dictionary[record] = 1
        return dictionary
    

    其中dictionary 是每个RDD 值的计数器,而record 是给定的RDD 值(一个整数,在本例中,请参见上面的RDD 示例)。基本上,如果字典中已经存在给定的 RDD 值,我们会增加一个 +1 计数器。否则,我们初始化计数器。

    分区间函数的工作原理几乎相同

    def betweenPartition(dictionary1, dictionary2):
        return {k: dictionary1.get(k, 0) + dictionary2.get(k, 0) for k in set(dictionary1) | set(dictionary2)}
    

    基本上,对于给定的 RDD 键,让我们考虑有两个字典。我们通过对给定键的值求和,或者在两个字典之一中不存在给定键(逻辑 OR)时添加给定键,将这两个字典合并到一个唯一字典中。感谢 georg's solution in this post 用于字典合并。

    生成的 RDD 将具有以下形式

    [(162, {2: 3}),
     (132, {2: 3}),
     (168, {1: 2, 2: 1}),
     (138, {2: 3}),
     (174, {1: 2, 2: 1}),
     (144, {1: 2, 2: 1}),
     (150, {2: 3}),
     (156, {2: 3}),
     (126, {2: 3}),
     (163, {1: 1, 2: 2}),
     (133, {2: 3}),
     (169, {2: 3}),
     (139, {2: 3}),
     (175, {1: 3}),
     (145, {1: 3}),
     (151, {2: 3}),
     (157, {1: 1, 2: 2}),
     (127, {2: 3}),
     (128, {2: 3}),
     (164, {2: 3}),
     (134, {2: 3}),
     (170, {2: 3}),
     (140, {2: 3}),
     (176, {1: 3}),
     (146, {2: 3}),
     (152, {2: 3}),
     (158, {2: 3}),
     (129, {2: 3}),
     (165, {2: 3}),
     (135, {2: 3}),
     (171, {2: 3}),
     (141, {1: 2, 2: 1}),
     (177, {2: 3}),
     (147, {2: 3}),
     (153, {1: 1, 2: 2}),
     (159, {2: 3}),
     (160, {2: 3}),
     (130, {2: 3}),
     (166, {1: 1, 2: 2}),
     (136, {1: 1, 2: 2}),
     (172, {2: 3}),
     (142, {2: 3}),
     (148, {2: 3}),
     (154, {2: 3}),
     (124, {2: 3}),
     (161, {1: 1, 2: 2}),
     (131, {2: 3}),
     (167, {2: 3}),
     (137, {2: 3}),
     (173, {1: 1, 2: 2}),
     (143, {2: 3}),
     (149, {2: 3}),
     (155, {1: 1, 2: 2}),
     (125, {2: 3})]
    

    原来的 RDD 键仍然可以在这个新的 RDD 中找到。每个新的 RDD 值都是一个字典。反过来,每个字典键对应于一个可能的 RDD 值,而每个字典值是给定 RDD 值对于每个 RDD 键存在多少次的计数器。

    【讨论】:

      【解决方案2】:

      试试:

      >>> import numpy as np
      >>>
      >>> rdd.groupByKey().map(lambda (x, y): np.histogram(list(y)))
      

      【讨论】:

      • 这对我不起作用。 np.histogram 不会接受 groupByKey 生成的“ResultIterable”。
      • 请解释为什么您认为这会有所帮助。仅代码答案通常不是很有用。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-08-26
      • 1970-01-01
      • 2018-02-14
      相关资源
      最近更新 更多