【问题标题】:Pyspark SQL Pandas UDF: Returning an arrayPyspark SQL Pandas UDF:返回一个数组
【发布时间】:2019-02-26 01:51:23
【问题描述】:

我正在尝试制作一个 pandas UDF,它接收具有整数值的两列,并根据这些值之间的差异返回一个长度等于上述差异的小数数组。

到目前为止,这是我的尝试,我一直在用很多不同的方法来尝试让它发挥作用,但这是总体思路

import pandas as pd

@pandas_udf(ArrayType(DecimalType()), PandasUDFType.SCALAR)
def zero_pad(x, y):
  buffer = []

  for i in range(0, (x - y)):
    buffer.append(0.0)

  return buffer #correction provided by Ali Yessili

这是我如何使用它的示例

df = df.withColumn("zero_list", zero_pad(df.x, df.y))

最终结果是 df,其中一个名为 zero_list 的新列是一个 ArrayType(DecimalType()) 列,看起来像 [0.0, 0.0, 0.0, ...],其长度为 (df.x - df.y)

错误消息太笼统了,几乎不值得发布,只是“作业因阶段失败而中止”,它只能追溯到我执行df.show() 的代码部分,

Py4JJavaError                             Traceback (most recent call last)
<command-103561> in <module>()
---> 33 df.orderBy("z").show(n=1000)

/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    350         """
    351         if isinstance(truncate, bool) and truncate:
--> 352             print(self._jdf.showString(n, 20, vertical))
    353         else:
    354             print(self._jdf.showString(n, int(truncate), vertical))

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

我希望有人能指出我正确的方向来制作一个将返回可变长度数组的 pandas udf,或者只是告诉我为什么我的代码或方法是错误的。

我正在使用带有 spark 2.3.1 的数据块来完成所有这些工作。

【问题讨论】:

    标签: python pandas pyspark pyspark-sql databricks


    【解决方案1】:

    这个问题是大约一年前的问题,但我遇到了同样的问题,这是我使用pandas_udf 的解决方案:

    import pandas as pd
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql.types import *
    
    @pandas_udf(ArrayType(IntegerType()), PandasUDFType.SCALAR)
    def zero_pad(xs,ys):
        buffer = []
        for idx, x in enumerate(xs):
            buffer.append([0]*int(x-ys[idx]))
    
        return pd.Series(buffer)
    
    df = df.withColumn("zero_list", zero_pad(df.x, df.y))
    

    【讨论】:

      【解决方案2】:

      我不明白您为什么要从函数返回 pandas Series 值。它为每个输入返回多行。

      >>> import pandas as pd
      >>> def zero_pad(x, y):
      ...     buffer = []
      ...     for i in range(0, (x - y)):
      ...             buffer.append(0.0)
      ...     return pd.Series(buffer)
      ... 
      >>> zero_pad(5,1)
      0    0.0
      1    0.0
      2    0.0
      3    0.0
      dtype: float64
      

      因此,您不能添加具有多行结果的列。

      另一方面,您不能在 withColumn 语句中直接使用 udf。请在下面查看我的脚本,我认为结果正是您想要的

      >>> from pyspark.sql.functions import udf
      >>> 
      >>> data = sc.parallelize([
      ...     (2,1),
      ...     (8,1),
      ...     (5,2),
      ...     (6,4)])
      >>> columns = ['x','y']
      >>> df = spark.createDataFrame(data, columns)
      >>> df.show()
      +---+---+
      |  x|  y|
      +---+---+
      |  2|  1|
      |  8|  1|
      |  5|  2|
      |  6|  4|
      +---+---+
      
      >>> def zero_pad(x, y):
      ...     buffer = []
      ...     for i in range(0, (x - y)):
      ...             buffer.append(0.0)
      ...     return buffer
      ... 
      >>> my_udf = udf(zero_pad)
      >>> df = df.withColumn("zero_list", my_udf(df.x, df.y))
      >>> df.show()
      +---+---+--------------------+
      |  x|  y|           zero_list|
      +---+---+--------------------+
      |  2|  1|               [0.0]|
      |  8|  1|[0.0, 0.0, 0.0, 0...|
      |  5|  2|     [0.0, 0.0, 0.0]|
      |  6|  4|          [0.0, 0.0]|
      +---+---+--------------------+
      

      【讨论】:

      • 您的第一句话是绝对正确的,这是我可以使用的解决方案,但我需要自己的解决方案来使用 pandas UDF:databricks.com/blog/2017/10/30/… 否则我会选择您的答案。
      猜你喜欢
      • 2020-09-03
      • 1970-01-01
      • 2021-11-21
      • 1970-01-01
      • 2018-05-20
      • 2021-12-13
      • 2020-11-02
      • 1970-01-01
      • 2020-02-15
      相关资源
      最近更新 更多