【问题标题】:PySpark udf returns null when function works in Pandas dataframe当函数在 Pandas 数据框中工作时,PySpark udf 返回 null
【发布时间】:2020-02-15 01:38:35
【问题描述】:

我正在尝试创建一个用户定义的函数,该函数采用数组的累积总和并将该值与另一列进行比较。 这是一个可重现的示例:

from pyspark.sql.session import SparkSession

# instantiate Spark
spark = SparkSession.builder.getOrCreate()

# make some test data
columns = ['loc', 'id', 'date', 'x', 'y']
vals = [
    ('a', 'b', '2016-07-01', 1, 5),
    ('a', 'b', '2016-07-02', 0, 5),
    ('a', 'b', '2016-07-03', 5, 15),
    ('a', 'b', '2016-07-04', 7, 5),
    ('a', 'b', '2016-07-05', 8, 20),
    ('a', 'b', '2016-07-06', 1, 5)
]

# create DataFrame
temp_sdf = (spark
      .createDataFrame(vals, columns)
      .withColumn('x_ary', collect_list('x').over(Window.partitionBy(['loc','id']).orderBy(desc('date')))))

temp_df = temp_sdf.toPandas()

def test_function(x_ary, y):
  cumsum_array = np.cumsum(x_ary) 
  result = len([x for x in cumsum_array if x <= y])
  return result

test_function_udf = udf(test_function, ArrayType(LongType()))

temp_df['len'] = temp_df.apply(lambda x: test_function(x['x_ary'], x['y']), axis = 1)
display(temp_df)

在 Pandas 中,这是输出:

loc id  date        x   y   x_ary           len
a   b   2016-07-06  1   5   [1]             1
a   b   2016-07-05  8   20  [1,8]           2
a   b   2016-07-04  7   5   [1,8,7]         1
a   b   2016-07-03  5   15  [1,8,7,5]       2
a   b   2016-07-02  0   5   [1,8,7,5,0]     1
a   b   2016-07-01  1   5   [1,8,7,5,0,1]   1

在使用temp_sdf.withColumn('len', test_function_udf('x_ary', 'y')) 的Spark 中,所有len 最终都是null

有人知道为什么会这样吗?

此外,在 pySpark 中替换 cumsum_array = np.cumsum(np.flip(x_ary)) 失败并出现错误 AttributeError: module 'numpy' has no attribute 'flip',但我知道它存在,因为我可以使用 Pandas 数据框正常运行它。
这个问题可以解决吗,或者有没有更好的方法来使用 pySpark 翻转数组?

提前感谢您的帮助。

【问题讨论】:

  • flip 需要 2 个参数,好像你没有为它提供轴
  • @Sri_Karthik 第二个参数是可选的; numpy flip
  • @Sri_Karthik 原来我需要将 pyspark 从 2.4.3 升级到 2.4.4 - 现在似乎可以工作了。

标签: python pandas pyspark user-defined-functions


【解决方案1】:

由于 test_function 返回整数而不是列表/数组。正如您提到的错误返回类型一样,您将获得空值。因此,请删除“来自 udf 的 ArrayType”或将返回类型替换为 LongType(),然后它将按以下方式工作。 :

注意:您可以选择设置 UDF 的返回类型,否则默认返回类型为 StringType。

选项1:

test_function_udf = udf(test_function) # Returns String type

选项2:

test_function_udf = udf(test_function, LongType())  #Returns Long/integer type

temp_sdf = temp_sdf.withColumn('len', 
           test_function_udf('x_ary', 'y'))
temp_sdf.show()

【讨论】:

  • 这行得通,但我肯定不明白它为什么行得通。你能解释一下吗?
  • 嗨凯,请参考 test_function 。 test_function 返回整数类型而不是列表。因此提及数组类型将返回空值。请参考我编辑的答案。
猜你喜欢
  • 2020-09-03
  • 2019-02-26
  • 2022-06-23
  • 2021-07-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-06
  • 1970-01-01
相关资源
最近更新 更多