【问题标题】:Pyspark; Using ReduceByKey on list values派斯帕克;在列表值上使用 ReduceByKey
【发布时间】:2021-03-08 15:14:30
【问题描述】:

我试图更好地理解 reduceByKey 函数,并一直在探索使用它完成不同任务的方法。我想应用如下所示的 RDD 数据。一行数据的格式是一个带有名称的元组,然后是与该名称关联的所有日期的列表(下面是数据外观的副本)

data = [("Cassavetes, Frank", ['2012', '2002', '2009', '2005']),
("Knight, Shirley (I)", ['1997', '2002', '2009']),
("Yip, Françoise", ['2007', '2004', '2000']),
("Danner, Blythe", ['2000', '2008', '2012', '2010', '2004', '2004', '1999', '1998']),
("Buck (X)", ['2002', '2006', '2009'])]

为了获得与元组中每个名称关联的所有日期的计数,我应用了下面的代码,使用 reduceByKey 函数尝试将日期列表转换为列表。

rdd = spark.sparkContext.parallelize(data)
reducedRdd = rdd.reduceByKey( lambda a,b: len(a.split(" ")) + len(b.split(" ")) )
reducedRdd.take(1)

上面的代码产生与输入数据相同的结果,并且不执行reduce函数中列出的任何转换,下面是代码输出的示例:

[('Yip, Françoise', ['2007', '2004', '2000'])]

我预期的输出如下;

[("Yip, Françoise", 3)]

为什么我上面写的代码没有给我预期的输出,我将如何改变它以确保它可以?

【问题讨论】:

    标签: apache-spark pyspark rdd


    【解决方案1】:

    您正在寻找map,而不是reduceByKey。没有什么可以减少的,因为你的数据已经按 key 分组了,所以你的 RDD 上没有做任何事情,你得到了原始的 RDD。

    rdd2 = rdd.map(lambda x: (x[0], len(x[1])))
    
    print(rdd2.collect())
    # [('Cassavetes, Frank', 4), ('Knight, Shirley (I)', 3), ('Yip, Françoise', 3), ('Danner, Blythe', 8), ('Buck (X)', 3)]
    

    mapValues 可能更合适:

    rdd2 = rdd.mapValues(len)
    
    print(rdd2.collect())
    # [('Cassavetes, Frank', 4), ('Knight, Shirley (I)', 3), ('Yip, Françoise', 3), ('Danner, Blythe', 8), ('Buck (X)', 3)]
    

    如果你想使用reduceByKey,你的数据应该被取消分组。例如如果你有

    data = [('Cassavetes, Frank', '2012'), ('Cassavetes, Frank', '2002'), ('Cassavetes, Frank', '2009'), ('Cassavetes, Frank', '2005'), ('Knight, Shirley (I)', '1997'), ('Knight, Shirley (I)', '2002'), ('Knight, Shirley (I)', '2009'), ('Yip, Françoise', '2007'), ('Yip, Françoise', '2004'), ('Yip, Françoise', '2000'), ('Danner, Blythe', '2000'), ('Danner, Blythe', '2008'), ('Danner, Blythe', '2012'), ('Danner, Blythe', '2010'), ('Danner, Blythe', '2004'), ('Danner, Blythe', '2004'), ('Danner, Blythe', '1999'), ('Danner, Blythe', '1998'), ('Buck (X)', '2002'), ('Buck (X)', '2006'), ('Buck (X)', '2009')]
    

    那你就可以了

    rdd = sc.parallelize(data)
    
    from operator import add
    rdd2 = rdd.map(lambda x: (x[0], 1)).reduceByKey(add)
    
    rdd2.collect()
    # [('Yip, Françoise', 3), ('Cassavetes, Frank', 4), ('Knight, Shirley (I)', 3), ('Danner, Blythe', 8), ('Buck (X)', 3)]
    

    【讨论】:

    • 这是有道理的,我想避免使用 map 函数,因为练习是关于探索 reduce 函数(reduce 或 reduceByKey)。如果不使用映射函数,这种事情(对分组值(例如我的数据集中的列表)执行减少输出计算)是不可能的吗?
    • 问题是,您的数据已经分组了,所以reduce 不会做任何事情,但如果您的原始数据未分组,您可以使用reduce。跨度>
    • 我添加了一个使用reduceByKey 的示例。看看对你有没有帮助:)
    • 回过头来看,我尝试使用reduceByKey对未接地数据使用以下代码; rdd.reduceByKey(lambda a,b: len(a)+len(b)) 但是当我尝试这样做并使用 take 函数查看输出时,我得到一个错误; “TypeError:'int' 类型的对象没有 len()”。我只能通过使用以下代码将我的 lambda 表达式中的变量 ab 强制转换为字符串来使其工作; rdd.reduceByKey(lambda a,b: len(str(a))+len(str(b)))。为什么我的变量 a 和 b 被读取为整数而不是字符串(这就是年份)。
    • 我需要查看更多代码才能得出结论。如果您可以提出另一个问题并提供必要的详细信息,那就太好了。这里的 cmets 部分没有足够的空间。
    猜你喜欢
    • 2021-07-25
    • 2021-07-07
    • 2018-07-30
    • 1970-01-01
    • 2023-04-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多