【问题标题】:How to get value_counts for a spark row?如何获取火花行的 value_counts?
【发布时间】:2026-02-05 01:30:01
【问题描述】:

我有一个 spark 数据框,其中 3 列存储 3 个不同的预测。我想知道每个输出值的计数,以便选择获得最大次数的值作为最终输出。

我可以在 pandas 中轻松地做到这一点,方法是为每一行调用我的 lambda 函数来获取 value_counts,如下所示。我已经在这里将我的 spark df 转换为 pandas df,但是我需要能够直接对 spark df 执行类似的操作。

r=[Row(run_1=1, run_2=2, run_3=1, name='test run', id=1)]
df1=spark.createDataFrame(r)
df1.show()
df2=df1.toPandas()
r=df2.iloc[0]
val_counts=r[['run_1','run_2','run_3']].value_counts()
print(val_counts)
top_val=val_counts.index[0] 
top_val_cnt=val_counts.values[0]
print('Majority output = %s, occured %s out of 3 times'%(top_val,top_val_cnt))

输出告诉我值 1 出现的次数最多——在这种情况下是两次——

+---+--------+-----+-----+-----+
| id|    name|run_1|run_2|run_3|
+---+--------+-----+-----+-----+
|  1|test run|    1|    2|    1|
+---+--------+-----+-----+-----+

1    2
2    1
Name: 0, dtype: int64

Majority output = 1, occured 2 out of 3 times

我正在尝试编写一个 udf 函数,它可以获取每个 df1 行并获取 top_val 和 top_val_cnt。有没有办法使用 spark df 实现这一点?

【问题讨论】:

    标签: dataframe apache-spark pyspark


    【解决方案1】:

    python的代码应该差不多,或许对你有帮助

      val df1 = Seq((1, 1, 1, 2), (1, 2, 3, 3), (2, 2, 2, 2)).toDF()
      df1.show()
      df1.select(array('*)).map(s=>{
        val list = s.getList(0)
        (list.toString(),list.toArray.groupBy(i => i).mapValues(_.size).toList.toString())
      }).show(false)
    

    输出:

    +---+---+---+---+
    | _1| _2| _3| _4|
    +---+---+---+---+
    |  1|  1|  1|  2|
    |  1|  2|  3|  3|
    |  2|  2|  2|  2|
    +---+---+---+---+
    
    +------------+-------------------------+
    |_1          |_2                       |
    +------------+-------------------------+
    |[1, 1, 1, 2]|List((2,1), (1,3))       |
    |[1, 2, 3, 3]|List((2,1), (1,1), (3,2))|
    |[2, 2, 2, 2]|List((2,4))              |
    +------------+-------------------------+
    

    【讨论】:

    • 谢谢@AndrzejS。我只是在寻找这样的东西:-)
    【解决方案2】:

    让我们有一个和你类似的测试数据框。

     list = [(1,'test run',1,2,1),(2,'test run',3,2,3),(3,'test run',4,4,4)]
    df=spark.createDataFrame(list, ['id', 'name','run_1','run_2','run_3'])
    
    newdf = df.rdd.map(lambda x : (x[0],x[1],x[2:])) \
    .map(lambda x : (x[0],x[1],x[2][0],x[2][1],x[2][2],[max(set(x[2]),key=x[2].count )])) \
    .toDF(['id','test','run_1','run_2','run_3','most_frequent'])
    
    
    >>> newdf.show()
    +---+--------+-----+-----+-----+-------------+
    | id|    test|run_1|run_2|run_3|most_frequent|
    +---+--------+-----+-----+-----+-------------+
    |  1|test run|    1|    2|    1|          [1]|
    |  2|test run|    3|    2|    3|          [3]|
    |  3|test run|    4|    4|    4|          [4]|
    +---+--------+-----+-----+-----+-------------+
    

    或者当列表中的每个项目都不同时,您需要处理一个案例。即返回一个空值。

    list = [(1,'test run',1,2,1),(2,'test run',3,2,3),(3,'test run',4,4,4),(4,'test run',1,2,3)]
    df=spark.createDataFrame(list, ['id', 'name','run_1','run_2','run_3'])
    
    from pyspark.sql.functions import udf
    
    @udf
    def most_frequent(*mylist): 
        counter = 1
        num = mylist[0] 
    
        for i in mylist: 
            curr_frequency = mylist.count(i) 
            if(curr_frequency> counter): 
                counter = curr_frequency 
                num = i 
    
                return num
        else:
                return None
    

    将计数器初始化为“1”并仅在大于“1”时返回计数。

    df.withColumn('most_frequent', most_frequent('run_1', 'run_2', 'run_3')).show()
    
    +---+--------+-----+-----+-----+-------------+
    | id|    name|run_1|run_2|run_3|most_frequent|
    +---+--------+-----+-----+-----+-------------+
    |  1|test run|    1|    2|    1|            1|
    |  2|test run|    3|    2|    3|            3|
    |  3|test run|    4|    4|    4|            4|
    |  4|test run|    1|    2|    3|         null|
    +---+--------+-----+-----+-----+-------------+
    +---+--------+-----+-----+-----+----+
    

    【讨论】: