【发布时间】:2018-10-08 10:15:22
【问题描述】:
我正在尝试在 pyspark 中创建一个 UDF,以将一列四舍五入到每一行中另一列指定的精度,例如,以下数据框:
+--------+--------+
| Data|Rounding|
+--------+--------+
|3.141592| 3|
|0.577215| 1|
+--------+--------+
当提交给所述UDF时应该给出以下结果:
+--------+--------+--------------+
| Data|Rounding|Rounded Column|
+--------+--------+--------------+
|3.141592| 3| 3.142|
|0.577215| 1| 0.6|
+--------+--------+--------------+
特别是我尝试了以下代码:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType, LongType,
IntegerType
pdDF = pd.DataFrame(columns=["Data", "Rounding"], data=[[3.141592, 3],
[0.577215, 1]])
mySchema = StructType([ StructField("Data", FloatType(), True),
StructField("Rounding", IntegerType(), True)])
spark = SparkSession.builder.master("local").appName("column
rounding").getOrCreate()
df = spark.createDataFrame(pdDF,schema=mySchema)
df.show()
def round_column(Data, Rounding):
return (lambda (Data, Rounding): round(Data, Rounding), FloatType())
spark.udf.register("column rounded to the precision specified by another",
round_column, FloatType())
df_rounded = df.withColumn('Rounded Column', round_column(df["Data"],
df["Rounding"]))
df_rounded .show()
但我收到以下错误:
Traceback (most recent call last):
File "whatever.py", line 21, in <module>
df_redondeado = df.withColumn('columna_redondeada',round_column(df["Data"], df["Rounding"]))
File "whomever\spark\spark-2.3.1-bin-hadoop2.7\python\pyspark\sql\dataframe.py", line 1848, in withColumn
assert isinstance(col, Column), "col should be Column"
AssertionError: col should be Column
任何帮助将不胜感激:)
【问题讨论】:
标签: apache-spark pyspark user-defined-functions