【问题标题】:Calculate the number of records whose values falls into each bin in SPARK计算其值落入 SPARK 中每个 bin 的记录数
【发布时间】:2021-08-21 09:34:02
【问题描述】:

我的数据框如下:

------+--------------+
|   sid|first_term_gpa|
+------+--------------+
|100170|           2.0|
|100446|        3.8333|
|100884|           2.0|
|101055|           3.0|
|101094|        3.7333|
|101775|        3.7647|
|102524|        3.8235|
|102798|           3.5|
|102960|        2.8235|
|103357|           3.0|
|103747|        3.8571|
|103902|           3.8|
|104053|        3.1667|
|104064|        1.8462|

我已经创建了一个 UDF 函数

def student_gpa(gpa):
    bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
    return bins[float(gpa)]

参数 gpa 预计为 float

我将上面创建的 UDF 应用到 f​​irst_term_gpa 列以创建一个名为 gpa_bin 的新列,代码如下:

alumni_ft_gpa = first_term_gpa \
.withColumn('gpa_bin', expr('student_gpa(first_term_gpa)'))\
.show()

但它会引发错误:

An exception was thrown from a UDF: 'TypeError: list indices must be integers or slices, not float', 

我在这里缺少什么?

【问题讨论】:

  • 错误很明显——你不能使用浮点数作为列表索引。只需使用 bins[int(gpa)] 代替
  • @mck 我试过了,它抛出了一个异常从 UDF 中抛出:'IndexError: list index out of range'
  • 那么你必须检查索引
  • @user1997567:实际上,当我尝试时,这两种解决方案都能完美运行。您能否更新您的问题并尝试我链接到的解决方案之一?

标签: dataframe apache-spark filter pyspark user-defined-functions


【解决方案1】:

使用导入

这是一个基于您的尝试的可行解决方案:

from pyspark.sql import Row, functions as F
from pyspark.sql.types import StringType   


df = spark.createDataFrame(
[Row(sid=100170, first_term_gpa=2.0),
 Row(sid=100446, first_term_gpa=3.8333),
 Row(sid=100884, first_term_gpa=2.0),
 Row(sid=101055, first_term_gpa=3.0),
 Row(sid=101094, first_term_gpa=3.7333),
 Row(sid=101775, first_term_gpa=3.7647),
 Row(sid=102524, first_term_gpa=3.8235),
 Row(sid=102798, first_term_gpa=3.5),
 Row(sid=102960, first_term_gpa=2.8235),
 Row(sid=103357, first_term_gpa=3.0),
 Row(sid=103747, first_term_gpa=3.8571),
 Row(sid=103902, first_term_gpa=3.8),
 Row(sid=104053, first_term_gpa=3.1667),
 Row(sid=104064, first_term_gpa=1.8462)]
)

@F.udf(StringType())
def student_gpa(gpa):
    bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
    return bins[int(gpa)]

df \
   .withColumn('gpa_bin', student_gpa('first_term_gpa'))\
   .show()

哪些输出

+------+--------------+-------+
|   sid|first_term_gpa|gpa_bin|
+------+--------------+-------+
|100170|           2.0|  [2,3)|
|100446|        3.8333|  [3,4)|
|100884|           2.0|  [2,3)|
|101055|           3.0|  [3,4)|
|101094|        3.7333|  [3,4)|
|101775|        3.7647|  [3,4)|
|102524|        3.8235|  [3,4)|
|102798|           3.5|  [3,4)|
|102960|        2.8235|  [2,3)|
|103357|           3.0|  [3,4)|
|103747|        3.8571|  [3,4)|
|103902|           3.8|  [3,4)|
|104053|        3.1667|  [3,4)|
|104064|        1.8462|  [1,2)|
+------+--------------+-------+

我将gpa 转换为整数的原因与我们构建区间的方式有关。例如。 gpa=2.5 预计会导致 bin [2,3) 对应于 bins 列表中的索引 2。我们通过将2.5 转换为整数来实现这一点。

仅使用 expr

from pyspark.sql.functions import expr

def student_gpa2(gpa):
    bins = ['[0,1)', '[1,2)', '[2,3)', '[3,4)']
    return bins[int(gpa)]

spark.udf.register("student_gpa2", student_gpa2)
df.withColumn('new_col', expr("student_gpa2(first_term_gpa)")).show()

【讨论】:

  • 复制你的代码后,我得到 int() 基数为 10 的无效文字:'first_term_gpa'
  • 我更新了如何创建 df。如果您仍然收到错误,您能否粘贴您的完整堆栈跟踪?另请显示您的数据框的架构
  • 不,你不能。需要这些导入
  • 但情况已经如此,udf 接受一个浮点数,然后返回一个字符串。我已经更新了我的解决方案来展示如何使用expr 来做到这一点。如果您下次能在问题中明确说明此限制,我们将不胜感激。
猜你喜欢
  • 1970-01-01
  • 2019-07-24
  • 2018-06-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多