【问题标题】:Pyspark RDD aggregate different value fields differentlyPyspark RDD 以不同方式聚合不同的值字段
【发布时间】:2020-03-31 23:26:19
【问题描述】:

这是一个非常开放的问题,但我有一个这种格式的 RDD。

[('2014-06', ('131313', 5.5, 6.5, 7.5, 10.5 )),
('2014-07', ('246655', 636636.53, .53252, 5252.112, 5242.23)),
('2014-06', ('131232', 1, 2, 4.5, 5.5)),
('2014-07', ('131322464', 536363.6363, 536336.6363, 3563.63636, 9.6464464646464646))]

我想通过键对每个值进行不同的分组和聚合。例如,对于键 '2014-06',我想获取第一个值字段的计数,即 '131313',以及键 '2014-06' 的其他字段 5.5, 6.5, 7.5, 10.5 的平均值。

因此,上述简单示例的键 '2014-06' 的结果将是 ('2014-06', (2, 3.25, 5.5, 8))

对 RDD 执行此操作的最佳方法是什么? 我不能使用任何 Spark SQL 表达式或函数,只能使用 RDD 函数。

我正在考虑用 mapValues 做一些事情并使用其他函数,但我在制定这个函数时遇到了一些问题。

我知道这个问题是开放式的,所以如果您还有其他问题,请告诉我。

感谢您的宝贵时间。

【问题讨论】:

    标签: python apache-spark pyspark aggregate rdd


    【解决方案1】:

    一种方法是使用 ma​​p() 方法将第一个值转换为 1(用于记录计数),然后使用 reduceByKey() 将每个值与同一把钥匙。最后,使用 ma​​pValues() 计算平均值,除了第一个是计数(保持原样)。

    rdd.map(lambda x: (x[0], (1, *x[1][1:]))) \
       .reduceByKey(lambda x,y: tuple([x[i]+y[i] for i in range(len(x))])) \
       .mapValues(lambda x: (x[0], *[ e/x[0] for e in x[1:]])) 
    

    ma​​p() 之后:

    [('2014-06', (1, 5.5, 6.5, 7.5, 10.5)),
     ('2014-07', (1, 636636.53, 0.53252, 5252.112, 5242.23)),
     ('2014-06', (1, 1, 2, 4.5, 5.5)),
     ('2014-07', (1, 536363.6363, 536336.6363, 3563.63636, 9.646446464646464))]
    

    reduceByKey() 之后:

    [('2014-06', (2, 6.5, 8.5, 12.0, 16.0)),
     ('2014-07',
      (2, 1173000.1663000002, 536337.16882, 8815.74836, 5251.876446464646))]
    

    ma​​pValues() 之后:

    [('2014-06', (2, 3.25, 4.25, 6.0, 8.0)),
     ('2014-07',
      (2, 586500.0831500001, 268168.58441, 4407.87418, 2625.938223232323))]
    

    【讨论】:

    • 嗯,实际上这在我使用的数据库上似乎对我不起作用,它创建字符串而不是做你概述的事情
    • @mrsquid,从您提供的示例数据来看,这些 RDD 方法不太可能进行字符串操作。
    • 问题在于第一张地图之后的第一张地图从看起来像这样 [('2014-6', ('2355134', 68.0, 66.0, 189.0, 6.0))]像这样 [('2355134', (1, '0', '1', '4'))]。我认为索引没有正确指定
    • 我无法重现这个,你确定你运行的是正确的代码吗?如果 RDD 元素类似于('2014-6', ('2355134', 68.0, 66.0, 189.0, 6.0)),则 x[0] 为2014-06,'2355134' 为x[1][0]
    • 是的,我真的很抱歉这是我的错。与我给出的示例相比,原始数据集中的索引不同,到目前为止代码运行良好。又是我的坏
    【解决方案2】:

    @jxc 解决方案可以满足您的需求,但这是另一种方式。

    您可以使用aggregateByKey。该函数接受两个函数seqFunccombFunc 和一个称为中性零值的累加器值。

    zero_value = (0, 0, 0, 0, 0)
    d = rdd.aggregateByKey(zero_value, lambda x, y: (1, *y[1:]),
                           lambda x, y: tuple(map(add, x, y))
                           ) \
        .mapValues(lambda v: (v[0], *[i / v[0] for i in v[1:]])) \
    

    第一个 lambda 表达式通过将第一个字符串字段替换为整数 1(计数一次)来转换每个值。 第二个 lambda 表达式通过添加两个列表来合并两个值。

    在这个聚合之后,我们只需要将每个值列表的元素除以给出平均值的第一个元素。

    输出:

    [('2014-06', (2, 3.25, 4.25, 6.0, 8.0)), ('2014-07', (2, 586500.0831500001, 268168.58441, 4407.87418, 2625.938223232323))]
    

    【讨论】:

    • 嗯,实际上计数似乎不起作用,即使该键有数百个不同的值,它也只会为每个键提供两个。
    • @mrsquid 你确定吗?刚刚用两个以上的键值对它进行了测试,它给出了正确的计数......你能举个例子吗?
    • 让我看看是否可以,我不确定为什么这不适用于完整的数据集。逻辑是合理的,它看起来与上面的版本相同,只是用 aggregateByKey 代替......
    • @mrsquid,问题出在 seqFunc 上,它旨在在分区内进行聚合。函数lambda x, y: (1, *y[1:]) 不进行任何聚合,它只为每个键保留一个元素并丢弃其他元素。这就是为什么你失去了很多价值观。最简单的测试方法是我在删除的评论中提到的,对于给定的示例,4 个元素和 2 个键,将数据移动到同一个分区,即运行 rdd.repartition(1).aggregateByKey(..)。此方法仅在同一分区中最多显示一次相同的键时才有用,这对于任何大数据项目都是不切实际的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-10-15
    • 2013-05-26
    • 1970-01-01
    • 1970-01-01
    • 2016-08-16
    • 2013-09-01
    • 1970-01-01
    相关资源
    最近更新 更多