您可以使用纯 pyspark 代替 UDF:
from pyspark.sql import functions as F
from pyspark.sql import types
d = [
("a", 5.2),
("b", 10.4),
("c", 7.8),
("d", 11.2),
("e", 3.5),
("f", 6.27),
("g", 2.43)
]
df = spark.createDataFrame(d, ['name','time'])
df.show()
# output
+----+----+
|name|time|
+----+----+
| a| 5.2|
| b|10.4|
| c| 7.8|
| d|11.2|
| e| 3.5|
| f|6.27|
| g|2.43|
+----+----+
(
df
.withColumn("col1", F.when(F.col("time") > 10, True).otherwise(False))
.withColumn("col2", F.when(F.col("time") < 0, True).otherwise(False))
.withColumn("col3", F.when(
(
(F.col("time") <= 12) &
(F.col("Time") >= 0)
), True).otherwise(False))
.show()
)
# output
+----+----+-----+-----+----+
|name|time| col1| col2|col3|
+----+----+-----+-----+----+
| a| 5.2|false|false|true|
| b|10.4| true|false|true|
| c| 7.8|false|false|true|
| d|11.2| true|false|true|
| e| 3.5|false|false|true|
| f|6.27|false|false|true|
| g|2.43|false|false|true|
+----+----+-----+-----+----+
@编辑
如果出于某种原因需要 UDF,也许你可以这样做:
@F.pandas_udf(types.BooleanType())
def build_col1(s: pd.Series) -> pd.Series:
return s.apply(lambda x: x > 10)
@F.pandas_udf(types.BooleanType())
def build_col2(s: pd.Series) -> pd.Series:
return s.apply(lambda x: x < 0)
@F.pandas_udf(types.BooleanType())
def build_col3(s: pd.Series) -> pd.Series:
return s.apply(lambda x: x >= 0 and x <= 12)
(
df
.withColumn("col1", build_col1("time"))
.withColumn("col2", build_col2("time"))
.withColumn("col3", build_col3("time"))
.show()
)
# output
+----+----+-----+-----+----+
|name|time| col1| col2|col3|
+----+----+-----+-----+----+
| a| 5.2|false|false|true|
| b|10.4| true|false|true|
| c| 7.8|false|false|true|
| d|11.2| true|false|true|
| e| 3.5|false|false|true|
| f|6.27|false|false|true|
| g|2.43|false|false|true|
+----+----+-----+-----+----+