【发布时间】:2021-03-04 07:53:28
【问题描述】:
如何在 spark 中构造一个包含 spark 3.0.1 的嵌套(结构)输入和输出值的 UDF?
注意:我知道旧版本 Arrow 的某些限制。这就是为什么我强制使用 conda 安装 pyarror >= 2,因为 https://issues.apache.org/jira/browse/ARROW-1644?src=confmacro 最近已解决。但是,spark 还不知道(完全支持它)。
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from pandas import Timestamp
from pyspark.sql.functions import collect_set, struct, col
spark = SparkSession.builder.appName("anomlydetection").master("local[4]").config("spark.driver.memory", "2G").getOrCreate()
metadata = pd.DataFrame({'meta_id': {0: '1', 1: '2', 2: '3', 3: '4', 4: '5'}, 'value_a': {0: 6, 1: 6, 2: 6, 3: 4, 4: 7}, 'time_start': {0: Timestamp('2020-08-12 04:29:24'), 1: Timestamp('2020-08-12 04:29:24'), 2: Timestamp('2020-08-12 04:29:24'), 3: Timestamp('2020-08-12 04:29:24'), 4: Timestamp('2020-08-12 04:29:24')}, 'time_end': {0: Timestamp('2020-08-12 10:22:23'), 1: Timestamp('2020-08-12 10:22:23'), 2: Timestamp('2020-08-12 10:22:23'), 3: Timestamp('2020-08-12 10:22:23'), 4: Timestamp('2020-08-12 10:22:23')}, 'value_b': {0: 15619415.0, 1: 15619415.0, 2: 15619415.0, 3: 15619415.0, 4: 15619415.0}})
metadata = spark.createDataFrame(metadata)
metadata = metadata.groupBy(["meta_id"]).agg(collect_set(struct(col("time_start"), col("time_end"), col("value_a"), col("value_b"))).alias("metadata"))
df = pd.DataFrame({'time':['2020-01-01 00:00', '2020-01-01 03:00', '2020-01-01 04:00', '2020-01-06 00:00'], 'category':['1','1','1','1'], 'value':[5, 8, 7, 2], 'meta_id':[1,2, np.NaN,5]})
df['time'] = pd.to_datetime(df['time'])
df = spark.createDataFrame(df)
df = df.join(metadata, ["meta_id"], "LEFT")
df.printSchema()
df.show(20, False)
看起来像:
root
|-- meta_id: double (nullable = true)
|-- time: timestamp (nullable = true)
|-- category: string (nullable = true)
|-- value: long (nullable = true)
|-- metadata: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- time_start: timestamp (nullable = true)
| | |-- time_end: timestamp (nullable = true)
| | |-- value_a: long (nullable = true)
| | |-- value_b: double (nullable = true)
-------+-------------------+--------+-----+------------------------------------------------------------+
|meta_id|time |category|value|metadata |
+-------+-------------------+--------+-----+------------------------------------------------------------+
|NaN |2020-01-01 04:00:00|1 |7 |null |
|1.0 |2020-01-01 00:00:00|1 |5 |[[2020-08-12 04:29:24, 2020-08-12 10:22:23, 6, 1.5619415E7]]|
|2.0 |2020-01-01 03:00:00|1 |8 |[[2020-08-12 04:29:24, 2020-08-12 10:22:23, 6, 1.5619415E7]]|
|5.0 |2020-01-06 00:00:00|1 |2 |[[2020-08-12 04:29:24, 2020-08-12 10:22:23, 7, 1.5619415E7]]|
+-------+-------------------+--------+-----+------------------------------------------------------------+
def s2s(time: pd.Series, metadata: pd.DataFrame) -> pd.DataFrame:
"""We must use DataFrames to represent the structs"""
# iterate over all the timestamp start/end and test for overlap with the time column.
# matching logic is not implemented for sake of brevity
# instead (and to better debug only a loop which prints the contents of metadata)
print(metadata)
if metadata is not None:
for m in metadata:
print(m)
print('***')
return pd.DataFrame({'overlap': False, 'overlap_value_a': -1, 'overlap_value_b':-1}, index=[0])
from pyspark.sql.functions import col, pandas_udf
s2s = pandas_udf(s2s, returnType=StructType())
df.select(s2s(col("time"), col("metadata"))).show()
失败:
0 None
Name: _1, dtype: object
None
***
ValueError: not enough values to unpack (expected 2, got 0)
但我已经在检查内部是否存在 NULL - 这里出了什么问题?
【问题讨论】:
-
pd.DataFrame映射到 StructType,但元数据列是array of StructType,不要认为当前的 pandas_udf 支持。您可能可以将数据类型从结构数组转换为字符串数组,例如:metadata = metadata.groupBy("meta_id").agg(collect_set(concat_ws(',',"time_start","time_end", "value_a","value_b")).alias("metadata")),然后在 pandas 中将它们拆分为 4 个字段。此外,如果 4 列中的任何一个为 NULL,则需要执行例如coalease("time_start", ""),以便字段在拆分后正确对齐。 -
我希望使用 Arrow 2.x 可以修复它 - 但您可能是对的,Spark 本身还没有足够了解这些新功能。
-
常规的 UDF 会起作用吗? IE。没有箭头?
-
是的,在常规 UDF 中,StructType 转换为
Row对象,MapType 转换为dict,ArrayType 转换为list。嵌套数据类型的组合也应该起作用。
标签: apache-spark pyspark apache-spark-sql user-defined-functions