【问题标题】:Pyspark Series2Series UDF with Array of Structs as input and Struct as outputPyspark Series2Series UDF,结构数组作为输入,结构作为输出
【发布时间】:2021-03-04 07:53:28
【问题描述】:

如何在 spark 中构造一个包含 spark 3.0.1 的嵌套(结构)输入和输出值的 UDF?

注意:我知道旧版本 Arrow 的某些限制。这就是为什么我强制使用 conda 安装 pyarror >= 2,因为 https://issues.apache.org/jira/browse/ARROW-1644?src=confmacro 最近已解决。但是,spark 还不知道(完全支持它)。

from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from pandas import Timestamp
from pyspark.sql.functions import collect_set, struct, col
spark = SparkSession.builder.appName("anomlydetection").master("local[4]").config("spark.driver.memory", "2G").getOrCreate()

metadata = pd.DataFrame({'meta_id': {0: '1', 1: '2', 2: '3', 3: '4', 4: '5'}, 'value_a': {0: 6, 1: 6, 2: 6, 3: 4, 4: 7}, 'time_start': {0: Timestamp('2020-08-12 04:29:24'), 1: Timestamp('2020-08-12 04:29:24'), 2: Timestamp('2020-08-12 04:29:24'), 3: Timestamp('2020-08-12 04:29:24'), 4: Timestamp('2020-08-12 04:29:24')}, 'time_end': {0: Timestamp('2020-08-12 10:22:23'), 1: Timestamp('2020-08-12 10:22:23'), 2: Timestamp('2020-08-12 10:22:23'), 3: Timestamp('2020-08-12 10:22:23'), 4: Timestamp('2020-08-12 10:22:23')}, 'value_b': {0: 15619415.0, 1: 15619415.0, 2: 15619415.0, 3: 15619415.0, 4: 15619415.0}})
metadata = spark.createDataFrame(metadata)
metadata = metadata.groupBy(["meta_id"]).agg(collect_set(struct(col("time_start"), col("time_end"), col("value_a"), col("value_b"))).alias("metadata"))

df = pd.DataFrame({'time':['2020-01-01 00:00', '2020-01-01 03:00', '2020-01-01 04:00', '2020-01-06 00:00'], 'category':['1','1','1','1'], 'value':[5, 8, 7, 2], 'meta_id':[1,2, np.NaN,5]})
df['time'] = pd.to_datetime(df['time'])
df = spark.createDataFrame(df)

df = df.join(metadata, ["meta_id"], "LEFT")
df.printSchema()
df.show(20, False)

看起来像:

root
 |-- meta_id: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- category: string (nullable = true)
 |-- value: long (nullable = true)
 |-- metadata: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- time_start: timestamp (nullable = true)
 |    |    |-- time_end: timestamp (nullable = true)
 |    |    |-- value_a: long (nullable = true)
 |    |    |-- value_b: double (nullable = true)


-------+-------------------+--------+-----+------------------------------------------------------------+
|meta_id|time               |category|value|metadata                                                    |
+-------+-------------------+--------+-----+------------------------------------------------------------+
|NaN    |2020-01-01 04:00:00|1       |7    |null                                                        |
|1.0    |2020-01-01 00:00:00|1       |5    |[[2020-08-12 04:29:24, 2020-08-12 10:22:23, 6, 1.5619415E7]]|
|2.0    |2020-01-01 03:00:00|1       |8    |[[2020-08-12 04:29:24, 2020-08-12 10:22:23, 6, 1.5619415E7]]|
|5.0    |2020-01-06 00:00:00|1       |2    |[[2020-08-12 04:29:24, 2020-08-12 10:22:23, 7, 1.5619415E7]]|
+-------+-------------------+--------+-----+------------------------------------------------------------+

def s2s(time: pd.Series, metadata: pd.DataFrame) -> pd.DataFrame:
    """We must use DataFrames to represent the structs"""
    # iterate over all the timestamp start/end and test for overlap with the time column.
    # matching logic is not implemented for sake of brevity
    # instead (and to better debug only a loop which prints the contents of metadata)
    print(metadata)
    if metadata is not None:
        for m in metadata:
            print(m)
    print('***')
    return pd.DataFrame({'overlap': False, 'overlap_value_a': -1, 'overlap_value_b':-1}, index=[0])

from pyspark.sql.functions import col, pandas_udf
s2s = pandas_udf(s2s, returnType=StructType())

df.select(s2s(col("time"), col("metadata"))).show()

失败:

0    None
Name: _1, dtype: object
None
***
ValueError: not enough values to unpack (expected 2, got 0)

但我已经在检查内部是否存在 NULL - 这里出了什么问题?

【问题讨论】:

  • pd.DataFrame 映射到 StructType,但元数据列是array of StructType,不要认为当前的 pandas_udf 支持。您可能可以将数据类型从结构数组转换为字符串数组,例如:metadata = metadata.groupBy("meta_id").agg(collect_set(concat_ws(',',"time_start","time_end", "value_a","value_b")).alias("metadata")),然后在 pandas 中将它们拆分为 4 个字段。此外,如果 4 列中的任何一个为 NULL,则需要执行例如 coalease("time_start", ""),以便字段在拆分后正确对齐。
  • 我希望使用 Arrow 2.x 可以修复它 - 但您可能是对的,Spark 本身还没有足够了解这些新功能。
  • 常规的 UDF 会起作用吗? IE。没有箭头?
  • 是的,在常规 UDF 中,StructType 转换为 Row 对象,MapType 转换为 dict,ArrayType 转换为 list。嵌套数据类型的组合也应该起作用。

标签: apache-spark pyspark apache-spark-sql user-defined-functions


【解决方案1】:

它有效 - 您只需使用 pandas.DataFrame 作为输出并为生成的结构/数组映射适当的类型。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-04-22
    • 1970-01-01
    • 2018-07-25
    • 1970-01-01
    • 1970-01-01
    • 2021-03-02
    • 2012-07-12
    • 1970-01-01
    相关资源
    最近更新 更多