【问题标题】:Mapping key and list of values to key value using pyspark使用 pyspark 将键和值列表映射到键值
【发布时间】:2021-02-25 03:08:10
【问题描述】:

我有一个由两列 C1 和 C2 组成的数据集。这些列与多对多关系相关联。

我想做的是为每个 C2 找到与整体 C2 值关联最多的值 C1。

例如:

C1  | C2
 1  | 2
 1  | 5
 1  | 9
 2  | 9
 2  | 8

我们可以在这里看到 1 与 C2 的 3 个值匹配,而 2 与 2 匹配,所以我想作为输出:

   Out1 |Out2| matches
     2  | 1  | 3
     5  | 1  | 3
     9  | 1  | 3 (1 wins because 3>2)
     8  | 2  | 2

到目前为止我所做的是:

  dataset = sc.textFile("...").\
          map(lambda line: (line.split(",")[0],list(line.split(",")[1]) ) ).\
          reduceByKey(lambda x , y : x+y )   

这是为每个 C1 值收集所有 C2 匹配项,此列表的计数是我们想要的匹配列。我现在想要的是以某种方式将此列表中的每个值用作新键并具有如下映射:

(Key ,Value_list[value1,value2,...]) -->(value1 , key ),(value2 , key)...

如何使用 spark 来做到这一点?任何建议都会很有帮助。

提前致谢!

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    对于此类任务,数据框 API 可能更容易。可以按C1分组,得到计数,再按C2分组,得到匹配次数最多的C1的值。

    import pyspark.sql.functions as F
    
    df = spark.read.csv('file.csv', header=True, inferSchema=True)
    
    df2 = (df.groupBy('C1')
             .count()
             .join(df, 'C1')
             .groupBy(F.col('C2').alias('Out1'))
             .agg(
               F.max(
                 F.struct(F.col('count').alias('matches'), F.col('C1').alias('Out2'))
               ).alias('c')
             )
             .select('Out1', 'c.Out2', 'c.matches')
             .orderBy('Out1')
          )
    
    df2.show()
    +----+----+-------+
    |Out1|Out2|matches|
    +----+----+-------+
    |   2|   1|      3|
    |   5|   1|      3|
    |   8|   2|      2|
    |   9|   1|      3|
    +----+----+-------+
    

    【讨论】:

      【解决方案2】:

      我们可以使用 dataframe API 轻松获得所需的结果。

      from pyspark.sql import *
      import pyspark.sql.functions as fun
      from pyspark.sql.window import Window
      
      spark = SparkSession.builder.master("local[*]").getOrCreate()
      
      # preparing sample dataframe
      data = [(1, 2), (1, 5), (1, 9), (2, 9), (2, 8)]
      schema = ["c1", "c2"]
      df = spark.createDataFrame(data, schema)
       
      output = df.withColumn("matches", fun.count("c1").over(Window.partitionBy("c1"))) \
      .groupby(fun.col('C2').alias('out1')) \
      .agg(fun.first(fun.col("c1")).alias("out2"), fun.max("matches").alias("matches"))
      
      output.show()
      
      # output
      +----+----+-------+
      |Out1|out2|matches|
      +----+----+-------+
      |   9|   1|      3|
      |   5|   1|      3|
      |   8|   2|      2|
      |   2|   1|      3|
      +----+----+-------+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-02-13
        • 2021-09-02
        • 1970-01-01
        • 2021-06-19
        • 1970-01-01
        • 2019-04-29
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多