【问题标题】:Return complex nested array type from UDF pyspark从 UDF pyspark 返回复杂的嵌套数组类型
【发布时间】:2021-04-13 01:05:59
【问题描述】:

在文末更新了更多问题

我需要在 pyspark 中使用 UDF 为 df 创建新列。 UDF 必须返回嵌套数组,格式为:

    [
        [before], [after], [from_tbl], [where_tbl], [to_tbl], [lst_tbl], [db_info]
    ]
        
with:
-----------------
before, after = [
                        [query_type,out,[from],[where]],
                        [query_type,out,[from],[where]]
                ]
-----------------
to_tbl = [write_mode, [table_name], table_action]
-----------------
from_tbl, where_tbl, from, where, table_name, lst_tbl, db_info = [a,b,c]

我定义了从 UDF 返回的模式,例如:

schema_return = T.StructType([
T.StructField('before', T.ArrayType(T.StructType([
    T.StructField('query_type', T.StringType(), True),
    T.StructField('out', T.StringType(), True),
    T.StructField('from', T.ArrayType(T.StringType(), True), True),
    T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('after', T.ArrayType(T.StructType([
    T.StructField('query_type', T.StringType(), True),
    T.StructField('out', T.StringType(), True),
    T.StructField('from', T.ArrayType(T.StringType(), True), True),
    T.StructField('where', T.ArrayType(T.StringType(), True), True),
])), True),
T.StructField('from_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('where_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('to_tbl', T.StructType([
    T.StructField('write_mode', T.StringType(), True),
    T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
    T.StructField('table_action', T.StringType(), True),
]), True),
T.StructField('lst_tbl', T.ArrayType(T.StringType(), True), True),
T.StructField('db_info', T.ArrayType(T.StringType(), True), True)

])

    @F.udf(returnType=schema_return)
    def udf(parameter):
...

我收到一个错误:

原因:java.lang.IllegalStateException:输入行没有架构所需的预期值数量。需要 7 个字段,而提供 0 个值。

我按照本教程进行操作:https://prodevsblog.com/questions/123979/how-to-return-a-tuple-type-in-a-udf-in-pyspark/ 举例:

schema = StructType([
    StructField("min", FloatType(), True),
    StructField("size", IntegerType(), True),
    StructField("edges",  ArrayType(FloatType()), True),
    StructField("val_to_index",  MapType(FloatType(), IntegerType()), True)
    # StructField('insanity', StructType([StructField("min_", FloatType(), True), StructField("size_", IntegerType(), True)]))

])

def func(values):
  mn = min(values)
  size = len(values)
  lst = sorted(values)[::-1]
  val_to_index = {x: i for i, x in enumerate(values)}
  return (mn, size, lst, val_to_index)

func = udf(func, schema)
dff = df.select('*', func('y[]').alias('complex_type'))
dff.show(10, False)

# +---+----------+------------------------------------------------------+
# |x  |y[]       |complex_type                                          |
# +---+----------+------------------------------------------------------+
# |0.0|[0.0, 3.0]|[0.0,2,WrappedArray(3.0, 0.0),Map(0.0 -> 0, 3.0 -> 1)]|
# |1.0|[6.0, 9.0]|[6.0,2,WrappedArray(9.0, 6.0),Map(9.0 -> 1, 6.0 -> 0)]|
# +---+----------+------------------------------------------------------+

我哪里错了?以及如何为上面的嵌套数组定义模式。

这是我的 UDF 返回的内容

return [before, after, from_tbl, where_tbl, to_tbl, list(set(lst_tbl)), dbinfo]
or 
return [] # maybe this is cause

更新更多

@mck 谈话后我不要返回 []。我将 return [] 替换为返回 None。但是我收到了更多错误,与第一个错误相同,例如:

原因:java.lang.IllegalStateException:输入行没有架构所需的预期值数量。需要 3 个字段,而提供 0 个值

有架构

schema_return = T.StructType([
    T.StructField('before', T.ArrayType(T.StructType([
        T.StructField('query_type', T.StringType(), True),
        T.StructField('out', T.StringType(), True),
        T.StructField('from', T.ArrayType(T.StringType(), True), True),
        T.StructField('where', T.ArrayType(T.StringType(), True), True),
    ])), True),
    T.StructField('after', T.ArrayType(T.StructType([
        T.StructField('query_type', T.StringType(), True),
        T.StructField('out', T.StringType(), True),
        T.StructField('from', T.ArrayType(T.StringType(), True), True),
        T.StructField('where', T.ArrayType(T.StringType(), True), True),
    ])), True),
    T.StructField('from_tbl', T.ArrayType(T.StringType(), True), True),
    T.StructField('where_tbl', T.ArrayType(T.StringType(), True), True),
    T.StructField('to_tbl', T.StructType([
        T.StructField('write_mode', T.StringType(), True),
        T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
        T.StructField('table_action', T.StringType(), True),
    ]), True),
    T.StructField('lst_tbl', T.ArrayType(T.StringType(), True), True),
    T.StructField('db_info', T.ArrayType(T.StringType(), True), True)
])

基于错误值的数量 = 3。我猜原因来自

T.StructField('to_tbl', T.StructType([
    T.StructField('write_mode', T.StringType(), True),
    T.StructField('table_name', T.ArrayType(T.StringType(), True), True),
    T.StructField('table_action', T.StringType(), True),
]), True),

我的列表:[before]、[after]、[from_tbl]、[where_tbl]、[to_tbl]、[lst_tbl]、[db_info] 将有嵌套元素 = [] 如果条件不是满足。如果我将 [] 替换为无。它影响到逻辑代码的最后。我怎样才能保留 [] 而不是 None。以及为什么这会导致错误 非常感谢

【问题讨论】:

  • 介意展示你的UDF吗?
  • 当然不能返回[],但是可以返回None
  • @mck 是的。我替换 return [] 以返回 None。这是工作。但是数组中的内部值也可能是 [] 空列表。如何返回 [] insted of None。第二个问题是:当我将数据转储到文件 csv.结果具有如下格式:Row(before=[], after=[], from_tbl=['STG_EMB_CUST_RET'], where_tbl=[], to_tbl=[], lst_tbl=['STG_EMB_CUST_RET'], db_info=['NZSQL' , 'PRD_STAGE', 'mbprd', 'NetezzaConnectorPX']) 我希望它在 python 中有格式列表:[[], [], ['STG_EMB_CUST_RET'], [], [],['STG_EMB_CUST_RET'], [ 'NZSQL', 'PRD_STAGE', 'mbprd', 'NetezzaConnectorPX'] 从文件重新加载到 df

标签: apache-spark pyspark user-defined-functions


【解决方案1】:

我做到了。如果返回结构类型。需要返回无。不返回 [] 包含嵌套元素。非常感谢@mck

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-08-18
    • 1970-01-01
    • 2017-02-06
    • 1970-01-01
    • 2022-11-24
    • 2012-08-10
    • 2019-02-26
    相关资源
    最近更新 更多