【问题标题】:Pyspark UDF to round one column to the precision specified by another columnPyspark UDF 将一列四舍五入到另一列指定的精度
【发布时间】:2018-10-08 10:15:22
【问题描述】:

我正在尝试在 pyspark 中创建一个 UDF,以将一列四舍五入到每一行中另一列指定的精度,例如,以下数据框:

+--------+--------+
|    Data|Rounding|
+--------+--------+
|3.141592|       3|
|0.577215|       1|
+--------+--------+

当提交给所述UDF时应该给出以下结果:

+--------+--------+--------------+
|    Data|Rounding|Rounded Column|
+--------+--------+--------------+
|3.141592|       3|         3.142|
|0.577215|       1|           0.6|
+--------+--------+--------------+

特别是我尝试了以下代码:

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, LongType, 
IntegerType

pdDF = pd.DataFrame(columns=["Data", "Rounding"], data=[[3.141592, 3], 
   [0.577215, 1]])

mySchema = StructType([ StructField("Data", FloatType(), True), 
StructField("Rounding", IntegerType(), True)])

spark = SparkSession.builder.master("local").appName("column 
rounding").getOrCreate()

df = spark.createDataFrame(pdDF,schema=mySchema)

df.show()

def round_column(Data, Rounding):
return (lambda (Data, Rounding): round(Data, Rounding), FloatType())

spark.udf.register("column rounded to the precision specified by another", 
round_column, FloatType())


df_rounded = df.withColumn('Rounded Column', round_column(df["Data"], 
df["Rounding"]))

df_rounded .show()

但我收到以下错误:

Traceback (most recent call last):
  File "whatever.py", line 21, in <module>
    df_redondeado = df.withColumn('columna_redondeada',round_column(df["Data"], df["Rounding"]))
  File "whomever\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\sql\dataframe.py", line 1848, in withColumn
    assert isinstance(col, Column), "col should be Column"
AssertionError: col should be Column

任何帮助将不胜感激:)

【问题讨论】:

    标签: apache-spark pyspark user-defined-functions


    【解决方案1】:

    如另一个答案中所述,您的 udf 无效。

    您可以使用如下内联 udf:

    udf_round_column = udf(lambda row: round(row['data'], row['rounding']), FloatType())
    df_rounded = df.withColumn('rounded_col', udf_round_column(struct('data', 'rounding')))
    

    或作为一个单独的函数:

    def round_column(data, rounding):
        return round(data, rounding)
    
    udf_round_column= udf(round_column, FloatType())
    df_rounded = df.withColumn('rounded_col', udf_round_to_decimal('data', 'rounding'))
    

    两者都返回这个:

    +---+---------+--------+-----------+
    | id|     data|rounding|rounded_col|
    +---+---------+--------+-----------+
    |  1|3.1415926|       3|      3.142|
    |  2|  0.12345|       6|    0.12345|
    |  3|   2.3456|       1|        2.3|
    +---+---------+--------+-----------+
    

    【讨论】:

      【解决方案2】:

      您的代码失败,因为round_column 不是有效的udf。你应该

      from pyspark.sql.functions import udf
      
      @udf(FloatType())
      def round_column(data, rounding):
          return round(data, rounding)
      

      spark.udf.register用于注册SQL查询调用的函数,此处不适用。

      但是你根本不需要udf。只是:

      from pyspark.sql.functions import expr
      
      df_rounded = df.withColumn('Rounded Column', 'expr(round(Data, Rounding))')
      

      【讨论】:

      • 非常感谢您解释在 sql 查询中使用 udf,但是,请您澄清一下我应该在 'expr(round(Data, Rounding))' 中使用哪个表达式?
      【解决方案3】:

      如果您想将 UDF 应用于数据框,则可以像这样简单地导入它

      from pyspark.sql.functions import udf

      并像使用它

      round_column_udf = udf(round_column, FloatType()) df_rounded = df.withColumn('Rounded_Column', round_column_udf(df['Data'], df['Rounding']))

      注册 udf 用于 spark sql 查询,如

      spark.udf.register("round_column_udf",round_column, FloatType()) df.registerTempTable("df") spark.sql("select Data, Rounding,round_column_udf(Data, Rounding) as Rounded_Column from df").show()

      两者都应该工作..

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-07-22
        • 2022-06-14
        • 1970-01-01
        • 2021-08-10
        • 1970-01-01
        • 2012-09-09
        相关资源
        最近更新 更多